thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 35(+17 -18)
application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java 3(+2 -1)
application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java 7(+4 -3)
application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java 105(+7 -98)
application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionCreateRequestMsg.java 2(+1 -1)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java 21(+16 -5)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java 4(+3 -1)
application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java 144(+77 -67)
application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java 22(+10 -12)
application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java 13(+8 -5)
application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java 26(+18 -8)
application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java 4(+2 -2)
application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java 182(+43 -139)
application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java 26(+7 -19)
application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSessionListener.java 15(+1 -14)
application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcSessionCreationFuture.java 63(+0 -63)
application/src/main/java/org/thingsboard/server/service/encoding/DataDecodingEncodingService.java 34(+34 -0)
application/src/main/java/org/thingsboard/server/service/encoding/ProtoWithJavaSerializationDecodingEncodingService.java 67(+67 -0)
application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java 18(+7 -11)
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java 350(+340 -10)
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java 2(+1 -1)
application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java 16(+16 -0)
application/src/main/proto/cluster.proto 121(+75 -46)
common/message/src/main/java/org/thingsboard/server/common/msg/cluster/SendToClusterMsg.java 40(+40 -0)
common/message/src/main/java/org/thingsboard/server/common/msg/core/ActorSystemToDeviceSessionActorMsg.java 3(+2 -1)
common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicActorSystemToDeviceSessionActorMsg.java 9(+7 -2)
common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java 4(+2 -2)
common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java 1(+0 -1)
common/message/src/main/java/org/thingsboard/server/common/msg/session/BasicTransportToDeviceSessionActorMsg.java 18(+8 -10)
common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java 5(+5 -0)
common/message/src/main/java/org/thingsboard/server/common/msg/session/TransportToDeviceSessionActorMsg.java 3(+2 -1)
dao/pom.xml 12(+0 -12)
docker/cluster-mode-thirdparty.yml 45(+45 -0)
pom.xml 27(+5 -22)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbClearAlarmNode.java 6(+3 -3)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java 10(+5 -5)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java 5(+2 -3)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedDeviceIdAsyncLoader.java 5(+2 -3)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java 10(+4 -6)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java 2(+1 -1)
transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java 4(+1 -3)
transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java 4(+2 -2)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 8640598..b4f9102 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -60,6 +60,7 @@ import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.component.ComponentDiscoveryService;
+import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.executors.ExternalCallExecutorService;
import org.thingsboard.server.service.mail.MailExecutorService;
@@ -104,6 +105,10 @@ public class ActorSystemContext {
@Autowired
@Getter
+ private DataDecodingEncodingService encodingService;
+
+ @Autowired
+ @Getter
private DeviceAuthService deviceAuthService;
@Autowired
@@ -203,6 +208,10 @@ public class ActorSystemContext {
@Getter
private DeviceStateService deviceStateService;
+ @Value("${cluster.partition_id}")
+ @Getter
+ private long queuePartitionId;
+
@Value("${actors.session.sync.timeout}")
@Getter
private long syncSessionTimeout;
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index 4d9b8b4..73cf788 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -34,6 +34,8 @@ import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
@@ -45,6 +47,7 @@ import scala.concurrent.duration.Duration;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
public class AppActor extends RuleChainManagerActor {
@@ -89,6 +92,9 @@ public class AppActor extends RuleChainManagerActor {
@Override
protected boolean process(TbActorMsg msg) {
switch (msg.getMsgType()) {
+ case SEND_TO_CLUSTER_MSG:
+ onPossibleClusterMsg((SendToClusterMsg) msg);
+ break;
case CLUSTER_EVENT_MSG:
broadcast(msg);
break;
@@ -112,6 +118,16 @@ public class AppActor extends RuleChainManagerActor {
return true;
}
+ private void onPossibleClusterMsg(SendToClusterMsg msg) {
+ Optional<ServerAddress> address = systemContext.getRoutingService().resolveById(msg.getEntityId());
+ if (address.isPresent()) {
+ systemContext.getRpcService().tell(
+ systemContext.getEncodingService().convertToProtoDataMessage(address.get(), msg.getMsg()));
+ } else {
+ self().tell(msg.getMsg(), ActorRef.noSender());
+ }
+ }
+
private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
if (SYSTEM_TENANT.equals(msg.getTenantId())) {
//TODO: ashvayka handle this.
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index f5f5848..2f5e20d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -23,7 +23,6 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
-import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.thingsboard.server.actors.ActorSystemContext;
@@ -47,7 +46,7 @@ import org.thingsboard.server.common.msg.core.AttributesUpdateRequest;
import org.thingsboard.server.common.msg.core.BasicCommandAckResponse;
import org.thingsboard.server.common.msg.core.BasicGetAttributesResponse;
import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
-import org.thingsboard.server.common.msg.core.BasicToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.core.BasicActorSystemToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.core.GetAttributesRequest;
import org.thingsboard.server.common.msg.core.RuleEngineError;
import org.thingsboard.server.common.msg.core.RuleEngineErrorMsg;
@@ -57,13 +56,12 @@ import org.thingsboard.server.common.msg.core.SessionOpenMsg;
import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;
import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
-import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.msg.session.SessionType;
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
@@ -155,7 +153,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
boolean sent = rpcSubscriptions.size() > 0;
Set<SessionId> syncSessionSet = new HashSet<>();
rpcSubscriptions.entrySet().forEach(sub -> {
- ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(rpcRequest, sub.getKey());
+ ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sub.getKey());
sendMsgToSessionActor(response, sub.getValue().getServer());
if (SessionType.SYNC == sub.getValue().getType()) {
syncSessionSet.add(sub.getKey());
@@ -197,7 +195,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (data != null) {
logger.debug("[{}] Queue put [{}] timeout detected!", deviceId, msg.getId());
ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(data.getSessionMsgType(), RuleEngineError.QUEUE_PUT_TIMEOUT);
- sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
+ sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
}
}
@@ -209,7 +207,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
logger.debug("[{}] Queue put [{}] ack detected. Remaining acks: {}!", deviceId, msg.getId(), remainingAcks);
if (remainingAcks == 0) {
ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onSuccess(data.getSessionMsgType(), data.getRequestId());
- sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
+ sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
}
}
}
@@ -247,7 +245,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
body.getMethod(),
body.getParams()
);
- ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(rpcRequest, sessionId);
+ ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sessionId);
sendMsgToSessionActor(response, server);
};
}
@@ -301,14 +299,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
public void onSuccess(@Nullable List<List<AttributeKvEntry>> result) {
BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(),
request.getRequestId(), BasicAttributeKVMsg.from(result.get(0), result.get(1)));
- sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(response, src.getSessionId()), src.getServerAddress());
+ sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, src.getSessionId()), src.getServerAddress());
}
@Override
public void onFailure(Throwable t) {
if (t instanceof Exception) {
ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onError(SessionMsgType.GET_ATTRIBUTES_REQUEST, request.getRequestId(), (Exception) t);
- sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, src.getSessionId()), src.getServerAddress());
+ sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, src.getSessionId()), src.getServerAddress());
} else {
logger.error("[{}] Failed to process attributes request", deviceId, t);
}
@@ -390,14 +388,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (data != null) {
logger.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId());
ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(SessionMsgType.TO_SERVER_RPC_REQUEST, RuleEngineError.TIMEOUT);
- sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer());
+ sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer());
}
}
void processToServerRPCResponse(ActorContext context, ToServerRpcResponseActorMsg msg) {
ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getMsg().getRequestId());
if (data != null) {
- sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer());
+ sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer());
}
}
@@ -408,7 +406,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
pendingMsgs.put(tbMsg.getId(), pendingMsgData);
scheduleMsgWithDelay(context, new DeviceActorQueueTimeoutMsg(tbMsg.getId(), systemContext.getQueuePersistenceTimeout()), systemContext.getQueuePersistenceTimeout());
} else {
- ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), pendingMsgData.getSessionId());
+ ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), pendingMsgData.getSessionId());
sendMsgToSessionActor(response, pendingMsgData.getServerAddress());
}
context.parent().tell(new DeviceActorToRuleEngineMsg(context.self(), tbMsg), context.self());
@@ -435,7 +433,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (notification != null) {
ToDeviceMsg finalNotification = notification;
attributeSubscriptions.entrySet().forEach(sub -> {
- ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(finalNotification, sub.getKey());
+ ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(finalNotification, sub.getKey());
sendMsgToSessionActor(response, sub.getValue().getServer());
});
}
@@ -461,7 +459,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
BasicCommandAckResponse response = success
? BasicCommandAckResponse.onSuccess(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId())
: BasicCommandAckResponse.onError(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId(), new TimeoutException());
- sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(response, msg.getSessionId()), msg.getServerAddress());
+ sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, msg.getSessionId()), msg.getServerAddress());
}
}
}
@@ -516,11 +514,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
- private void sendMsgToSessionActor(ToDeviceSessionActorMsg response, Optional<ServerAddress> sessionAddress) {
+ private void sendMsgToSessionActor(ActorSystemToDeviceSessionActorMsg response, Optional<ServerAddress> sessionAddress) {
if (sessionAddress.isPresent()) {
ServerAddress address = sessionAddress.get();
logger.debug("{} Forwarding msg: {}", address, response);
- systemContext.getRpcService().tell(sessionAddress.get(), response);
+ systemContext.getRpcService().tell(systemContext.getEncodingService()
+ .convertToProtoDataMessage(sessionAddress.get(), response));
} else {
systemContext.getSessionManagerActor().tell(response, ActorRef.noSender());
}
@@ -528,7 +527,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
void processCredentialsUpdate() {
sessions.forEach((k, v) -> {
- sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(new SessionCloseNotification(), k), v.getServer());
+ sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(new SessionCloseNotification(), k), v.getServer());
});
attributeSubscriptions.clear();
rpcSubscriptions.clear();
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
index 5e16aab..f6b30bf 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
@@ -82,7 +82,8 @@ public final class PluginProcessingContext implements PluginContext {
@Override
public void sendPluginRpcMsg(RpcMsg msg) {
- this.pluginCtx.rpcService.tell(new PluginRpcMsg(pluginCtx.tenantId, pluginCtx.pluginId, msg));
+ //ToDO is this a cluster messsage?
+// this.pluginCtx.rpcService.tell(new PluginRpcMsg(pluginCtx.tenantId, pluginCtx.pluginId, msg));
}
@Override
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java
index dcabedc..c43d62d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java
@@ -21,6 +21,7 @@ import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.PluginId;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.common.msg.timeout.TimeoutMsg;
@@ -100,7 +101,7 @@ public final class SharedPluginProcessingContext {
}
public void toDeviceActor(DeviceAttributesEventNotificationMsg msg) {
- forward(msg.getDeviceId(), msg, rpcService::tell);
+ forward(msg.getDeviceId(), msg);
}
public void sendRpcRequest(ToDeviceRpcRequest msg) {
@@ -109,11 +110,11 @@ public final class SharedPluginProcessingContext {
// forward(msg.getDeviceId(), rpcMsg, rpcService::tell);
}
- private <T> void forward(DeviceId deviceId, T msg, BiConsumer<ServerAddress, T> rpcFunction) {
+ private <T extends TbActorMsg> void forward(DeviceId deviceId, T msg) {
Optional<ServerAddress> instance = routingService.resolveById(deviceId);
if (instance.isPresent()) {
log.trace("[{}] Forwarding msg {} to remote device actor!", pluginId, msg);
- rpcFunction.accept(instance.get(), msg);
+ rpcService.tell(systemContext.getEncodingService().convertToProtoDataMessage(instance.get(), msg));
} else {
log.trace("[{}] Forwarding msg {} to local device actor!", pluginId, msg);
parentActor.tell(msg, ActorRef.noSender());
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationCallback.java b/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationCallback.java
index f856ed6..6e47e35 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationCallback.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationCallback.java
@@ -15,7 +15,7 @@
*/
package org.thingsboard.server.actors.plugin;
-import com.hazelcast.util.function.Consumer;
+import java.util.function.Consumer;
import org.thingsboard.server.extensions.api.exception.AccessDeniedException;
import org.thingsboard.server.extensions.api.exception.EntityNotFoundException;
import org.thingsboard.server.extensions.api.exception.InternalErrorException;
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
index bc36dc8..14bb636 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
@@ -17,30 +17,10 @@ package org.thingsboard.server.actors.rpc;
import akka.actor.ActorRef;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.util.SerializationUtils;
-import org.springframework.util.StringUtils;
-import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ActorService;
-import org.thingsboard.server.common.data.id.DeviceId;
-import org.thingsboard.server.common.data.id.PluginId;
-import org.thingsboard.server.common.data.id.TenantId;
-import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
-import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
-import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
-import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.*;
-import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
-import org.thingsboard.server.extensions.api.plugins.rpc.RpcMsg;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.rpc.GrpcSession;
import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
-import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
-
-import java.io.Serializable;
-import java.util.UUID;
/**
* @author Andrew Shvayka
@@ -48,15 +28,12 @@ import java.util.UUID;
@Slf4j
public class BasicRpcSessionListener implements GrpcSessionListener {
- public static final String SESSION_RECEIVED_SESSION_ACTOR_MSG = "{} session [{}] received session actor msg {}";
- private final ActorSystemContext context;
private final ActorService service;
private final ActorRef manager;
private final ActorRef self;
- public BasicRpcSessionListener(ActorSystemContext context, ActorRef manager, ActorRef self) {
- this.context = context;
- this.service = context.getActorService();
+ public BasicRpcSessionListener(ActorService service, ActorRef manager, ActorRef self) {
+ this.service = service;
this.manager = manager;
this.self = self;
}
@@ -76,47 +53,11 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
}
@Override
- public void onToPluginRpcMsg(GrpcSession session, ClusterAPIProtos.ToPluginRpcMessage msg) {
- if (log.isTraceEnabled()) {
- log.trace("{} session [{}] received plugin msg {}", getType(session), session.getRemoteServer(), msg);
- }
- service.onMsg(convert(session.getRemoteServer(), msg));
- }
-
- @Override
- public void onToDeviceActorRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceActorRpcMessage msg) {
- log.trace("{} session [{}] received device actor msg {}", getType(session), session.getRemoteServer(), msg);
- service.onMsg((DeviceToDeviceActorMsg) deserialize(msg.getData().toByteArray()));
- }
-
- @Override
- public void onToDeviceActorNotificationRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceActorNotificationRpcMessage msg) {
- log.trace("{} session [{}] received device actor notification msg {}", getType(session), session.getRemoteServer(), msg);
- service.onMsg((ToDeviceActorNotificationMsg) deserialize(msg.getData().toByteArray()));
- }
-
- @Override
- public void onToDeviceSessionActorRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceSessionActorRpcMessage msg) {
- log.trace(SESSION_RECEIVED_SESSION_ACTOR_MSG, getType(session), session.getRemoteServer(), msg);
- service.onMsg((ToDeviceSessionActorMsg) deserialize(msg.getData().toByteArray()));
- }
-
- @Override
- public void onToDeviceRpcRequestRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) {
- log.trace(SESSION_RECEIVED_SESSION_ACTOR_MSG, getType(session), session.getRemoteServer(), msg);
- service.onMsg(deserialize(session.getRemoteServer(), msg));
- }
-
- @Override
- public void onFromDeviceRpcResponseRpcMsg(GrpcSession session, ClusterAPIProtos.ToPluginRpcResponseRpcMessage msg) {
- log.trace(SESSION_RECEIVED_SESSION_ACTOR_MSG, getType(session), session.getRemoteServer(), msg);
- service.onMsg(deserialize(session.getRemoteServer(), msg));
- }
-
- @Override
- public void onToAllNodesRpcMessage(GrpcSession session, ClusterAPIProtos.ToAllNodesRpcMessage msg) {
- log.trace(SESSION_RECEIVED_SESSION_ACTOR_MSG, getType(session), session.getRemoteServer(), msg);
- service.onMsg((ToAllNodesMsg) deserialize(msg.getData().toByteArray()));
+ public void onReceiveClusterGrpcMsg(GrpcSession session, ClusterAPIProtos.ClusterMessage clusterMessage) {
+ log.trace("{} Service [{}] received session actor msg {}", getType(session),
+ session.getRemoteServer(),
+ clusterMessage);
+ service.onReceivedMsg(session.getRemoteServer(), clusterMessage);
}
@Override
@@ -130,37 +71,5 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
return session.isClient() ? "Client" : "Server";
}
- private static PluginRpcMsg convert(ServerAddress serverAddress, ClusterAPIProtos.ToPluginRpcMessage msg) {
- ClusterAPIProtos.PluginAddress address = msg.getAddress();
- TenantId tenantId = new TenantId(toUUID(address.getTenantId()));
- PluginId pluginId = new PluginId(toUUID(address.getPluginId()));
- RpcMsg rpcMsg = new RpcMsg(serverAddress, msg.getClazz(), msg.getData().toByteArray());
- return new PluginRpcMsg(tenantId, pluginId, rpcMsg);
- }
-
- private static UUID toUUID(ClusterAPIProtos.Uid uid) {
- return new UUID(uid.getPluginUuidMsb(), uid.getPluginUuidLsb());
- }
-
- private static ToDeviceRpcRequestActorMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) {
- TenantId deviceTenantId = new TenantId(toUUID(msg.getDeviceTenantId()));
- DeviceId deviceId = new DeviceId(toUUID(msg.getDeviceId()));
-
- ToDeviceRpcRequestBody requestBody = new ToDeviceRpcRequestBody(msg.getMethod(), msg.getParams());
- ToDeviceRpcRequest request = new ToDeviceRpcRequest(toUUID(msg.getMsgId()), deviceTenantId, deviceId, msg.getOneway(), msg.getExpTime(), requestBody);
-
- return new ToDeviceRpcRequestActorMsg(serverAddress, request);
- }
-
- private static ToPluginRpcResponseDeviceMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToPluginRpcResponseRpcMessage msg) {
- RpcError error = !StringUtils.isEmpty(msg.getError()) ? RpcError.valueOf(msg.getError()) : null;
- FromDeviceRpcResponse response = new FromDeviceRpcResponse(toUUID(msg.getMsgId()), msg.getResponse(), error);
- return new ToPluginRpcResponseDeviceMsg(null, null, response);
- }
-
- @SuppressWarnings("unchecked")
- private static <T extends Serializable> T deserialize(byte[] data) {
- return (T) SerializationUtils.deserialize(data);
- }
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcBroadcastMsg.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcBroadcastMsg.java
index 3718a22..2dd949e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcBroadcastMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcBroadcastMsg.java
@@ -23,5 +23,5 @@ import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
*/
@Data
public final class RpcBroadcastMsg {
- private final ClusterAPIProtos.ToRpcServerMessage msg;
+ private final ClusterAPIProtos.ClusterMessage msg;
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
index ba20013..c5c6553 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
@@ -40,7 +40,7 @@ public class RpcManagerActor extends ContextAwareActor {
private final Map<ServerAddress, SessionActorInfo> sessionActors;
- private final Map<ServerAddress, Queue<ClusterAPIProtos.ToRpcServerMessage>> pendingMsgs;
+ private final Map<ServerAddress, Queue<ClusterAPIProtos.ClusterMessage>> pendingMsgs;
private final ServerAddress instance;
@@ -65,8 +65,8 @@ public class RpcManagerActor extends ContextAwareActor {
@Override
public void onReceive(Object msg) throws Exception {
- if (msg instanceof RpcSessionTellMsg) {
- onMsg((RpcSessionTellMsg) msg);
+ if (msg instanceof ClusterAPIProtos.ClusterMessage) {
+ onMsg((ClusterAPIProtos.ClusterMessage) msg);
} else if (msg instanceof RpcBroadcastMsg) {
onMsg((RpcBroadcastMsg) msg);
} else if (msg instanceof RpcSessionCreateRequestMsg) {
@@ -84,27 +84,32 @@ public class RpcManagerActor extends ContextAwareActor {
private void onMsg(RpcBroadcastMsg msg) {
log.debug("Forwarding msg to session actors {}", msg);
- sessionActors.keySet().forEach(address -> onMsg(new RpcSessionTellMsg(address, msg.getMsg())));
+ sessionActors.keySet().forEach(address -> onMsg(msg.getMsg()));
pendingMsgs.values().forEach(queue -> queue.add(msg.getMsg()));
}
- private void onMsg(RpcSessionTellMsg msg) {
- ServerAddress address = msg.getServerAddress();
- SessionActorInfo session = sessionActors.get(address);
- if (session != null) {
- log.debug("{} Forwarding msg to session actor", address);
- session.actor.tell(msg, ActorRef.noSender());
- } else {
- log.debug("{} Storing msg to pending queue", address);
- Queue<ClusterAPIProtos.ToRpcServerMessage> queue = pendingMsgs.get(address);
- if (queue == null) {
- queue = new LinkedList<>();
- pendingMsgs.put(address, queue);
+ private void onMsg(ClusterAPIProtos.ClusterMessage msg) {
+ if (msg.hasServerAddress()) {
+ ServerAddress address = new ServerAddress(msg.getServerAddress().getHost(),
+ msg.getServerAddress().getPort());
+ SessionActorInfo session = sessionActors.get(address);
+ if (session != null) {
+ log.debug("{} Forwarding msg to session actor", address);
+ session.getActor().tell(msg, ActorRef.noSender());
+ } else {
+ log.debug("{} Storing msg to pending queue", address);
+ Queue<ClusterAPIProtos.ClusterMessage> queue = pendingMsgs.get(address);
+ if (queue == null) {
+ queue = new LinkedList<>();
+ pendingMsgs.put(new ServerAddress(
+ msg.getServerAddress().getHost(), msg.getServerAddress().getPort()), queue);
+ }
+ queue.add(msg);
}
- queue.add(msg.getMsg());
+ } else {
+ logger.warning("Cluster msg doesn't have set Server Address [{}]", msg);
}
}
-
@Override
public void postStop() {
sessionActors.clear();
@@ -167,10 +172,10 @@ public class RpcManagerActor extends ContextAwareActor {
private void register(ServerAddress remoteAddress, UUID uuid, ActorRef sender) {
sessionActors.put(remoteAddress, new SessionActorInfo(uuid, sender));
log.debug("[{}][{}] Registering session actor.", remoteAddress, uuid);
- Queue<ClusterAPIProtos.ToRpcServerMessage> data = pendingMsgs.remove(remoteAddress);
+ Queue<ClusterAPIProtos.ClusterMessage> data = pendingMsgs.remove(remoteAddress);
if (data != null) {
log.debug("[{}][{}] Forwarding {} pending messages.", remoteAddress, uuid, data.size());
- data.forEach(msg -> sender.tell(new RpcSessionTellMsg(remoteAddress, msg), ActorRef.noSender()));
+ data.forEach(msg -> sender.tell(new RpcSessionTellMsg(msg), ActorRef.noSender()));
} else {
log.debug("[{}][{}] No pending messages to forward.", remoteAddress, uuid);
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
index a187444..c9cf869 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
@@ -32,6 +32,8 @@ import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
import java.util.UUID;
+import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CONNECT_RPC_MESSAGE;
+
/**
* @author Andrew Shvayka
*/
@@ -56,15 +58,15 @@ public class RpcSessionActor extends ContextAwareActor {
@Override
public void onReceive(Object msg) throws Exception {
- if (msg instanceof RpcSessionTellMsg) {
- tell((RpcSessionTellMsg) msg);
+ if (msg instanceof ClusterAPIProtos.ClusterMessage) {
+ tell((ClusterAPIProtos.ClusterMessage) msg);
} else if (msg instanceof RpcSessionCreateRequestMsg) {
initSession((RpcSessionCreateRequestMsg) msg);
}
}
- private void tell(RpcSessionTellMsg msg) {
- session.sendMsg(msg.getMsg());
+ private void tell(ClusterAPIProtos.ClusterMessage msg) {
+ session.sendMsg(msg);
}
@Override
@@ -76,7 +78,7 @@ public class RpcSessionActor extends ContextAwareActor {
private void initSession(RpcSessionCreateRequestMsg msg) {
log.info("[{}] Initializing session", context().self());
ServerAddress remoteServer = msg.getRemoteAddress();
- listener = new BasicRpcSessionListener(systemContext, context().parent(), context().self());
+ listener = new BasicRpcSessionListener(systemContext.getActorService(), context().parent(), context().self());
if (msg.getRemoteAddress() == null) {
// Server session
session = new GrpcSession(listener);
@@ -91,7 +93,7 @@ public class RpcSessionActor extends ContextAwareActor {
session.initInputStream();
ClusterRpcServiceGrpc.ClusterRpcServiceStub stub = ClusterRpcServiceGrpc.newStub(channel);
- StreamObserver<ClusterAPIProtos.ToRpcServerMessage> outputStream = stub.handlePluginMsgs(session.getInputStream());
+ StreamObserver<ClusterAPIProtos.ClusterMessage> outputStream = stub.handleMsgs(session.getInputStream());
session.setOutputStream(outputStream);
session.initOutputStream();
@@ -115,11 +117,10 @@ public class RpcSessionActor extends ContextAwareActor {
}
}
- private ClusterAPIProtos.ToRpcServerMessage toConnectMsg() {
+ private ClusterAPIProtos.ClusterMessage toConnectMsg() {
ServerAddress instance = systemContext.getDiscoveryService().getCurrentServer().getServerAddress();
- return ClusterAPIProtos.ToRpcServerMessage.newBuilder().setConnectMsg(
- ClusterAPIProtos.ConnectRpcMessage.newBuilder().setServerAddress(
- ClusterAPIProtos.ServerAddress.newBuilder().setHost(instance.getHost()).setPort(instance.getPort()).build()).build()).build();
-
+ return ClusterAPIProtos.ClusterMessage.newBuilder().setMessageType(CONNECT_RPC_MESSAGE).setServerAddress(
+ ClusterAPIProtos.ServerAddress.newBuilder().setHost(instance.getHost())
+ .setPort(instance.getPort()).build()).build();
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionCreateRequestMsg.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionCreateRequestMsg.java
index 5bcf1d6..0c1136e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionCreateRequestMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionCreateRequestMsg.java
@@ -30,6 +30,6 @@ public final class RpcSessionCreateRequestMsg {
private final UUID msgUid;
private final ServerAddress remoteAddress;
- private final StreamObserver<ClusterAPIProtos.ToRpcServerMessage> responseObserver;
+ private final StreamObserver<ClusterAPIProtos.ClusterMessage> responseObserver;
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionTellMsg.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionTellMsg.java
index 5a61044..858e3aa 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionTellMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionTellMsg.java
@@ -24,6 +24,5 @@ import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
*/
@Data
public final class RpcSessionTellMsg {
- private final ServerAddress serverAddress;
- private final ClusterAPIProtos.ToRpcServerMessage msg;
+ private final ClusterAPIProtos.ClusterMessage msg;
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index 0508123..85d1b54 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -43,6 +43,7 @@ import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.service.script.NashornJsEngine;
import scala.concurrent.duration.Duration;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -63,15 +64,24 @@ class DefaultTbContext implements TbContext {
@Override
public void tellNext(TbMsg msg, String relationType) {
- tellNext(msg, relationType, null);
+ tellNext(msg, Collections.singleton(relationType), null);
+ }
+
+ @Override
+ public void tellNext(TbMsg msg, Set<String> relationTypes) {
+ tellNext(msg, relationTypes, null);
}
@Override
public void tellNext(TbMsg msg, String relationType, Throwable th) {
+ tellNext(msg, Collections.singleton(relationType), th);
+ }
+
+ private void tellNext(TbMsg msg, Set<String> relationTypes, Throwable th) {
if (nodeCtx.getSelf().isDebugMode()) {
- mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th);
+ relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th));
}
- nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationType, msg), nodeCtx.getSelfActor());
+ nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg), nodeCtx.getSelfActor());
}
@Override
@@ -99,12 +109,12 @@ class DefaultTbContext implements TbContext {
@Override
public TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) {
- return new TbMsg(UUIDs.timeBased(), type, originator, metaData.copy(), data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), 0L);
+ return new TbMsg(UUIDs.timeBased(), type, originator, metaData.copy(), data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), mainCtx.getQueuePartitionId());
}
@Override
public TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) {
- return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), data, origMsg.getRuleChainId(), origMsg.getRuleNodeId(), 0L);
+ return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), data, origMsg.getRuleChainId(), origMsg.getRuleNodeId(), mainCtx.getQueuePartitionId());
}
@Override
@@ -118,11 +128,6 @@ class DefaultTbContext implements TbContext {
}
@Override
- public void tellNext(TbMsg msg, Set<String> relationTypes) {
- relationTypes.forEach(type -> tellNext(msg, type));
- }
-
- @Override
public ListeningExecutor getJsExecutor() {
return mainCtx.getJsExecutor();
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
index 4812002..3ba646a 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
@@ -54,6 +54,8 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
case RULE_CHAIN_TO_RULE_CHAIN_MSG:
processor.onRuleChainToRuleChainMsg((RuleChainToRuleChainMsg) msg);
break;
+ case CLUSTER_EVENT_MSG:
+ break;
default:
return false;
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index dda12e5..ac902a7 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -95,12 +95,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private void reprocess(List<RuleNode> ruleNodeList) {
for (RuleNode ruleNode : ruleNodeList) {
- for (TbMsg tbMsg : queue.findUnprocessed(tenantId, ruleNode.getId().getId(), 0L)) {
+ for (TbMsg tbMsg : queue.findUnprocessed(tenantId, ruleNode.getId().getId(), systemContext.getQueuePartitionId())) {
pushMsgToNode(nodeActors.get(ruleNode.getId()), tbMsg, "");
}
}
if (firstNode != null) {
- for (TbMsg tbMsg : queue.findUnprocessed(tenantId, entityId.getId(), 0L)) {
+ for (TbMsg tbMsg : queue.findUnprocessed(tenantId, entityId.getId(), systemContext.getQueuePartitionId())) {
pushMsgToNode(firstNode, tbMsg, "");
}
}
@@ -206,9 +206,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
checkActive();
RuleNodeId originator = envelope.getOriginator();
- String targetRelationType = envelope.getRelationType();
List<RuleNodeRelation> relations = nodeRoutes.get(originator).stream()
- .filter(r -> targetRelationType == null || targetRelationType.equalsIgnoreCase(r.getType()))
+ .filter(r -> contains(envelope.getRelationTypes(), r.getType()))
.collect(Collectors.toList());
TbMsg msg = envelope.getMsg();
@@ -237,6 +236,18 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
}
}
+ private boolean contains(Set<String> relationTypes, String type) {
+ if (relationTypes == null) {
+ return true;
+ }
+ for (String relationType : relationTypes) {
+ if (relationType.equalsIgnoreCase(type)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private void enqueueAndForwardMsgCopyToChain(TbMsg msg, EntityId target, String fromRelationType) {
RuleChainId targetRCId = new RuleChainId(target.getId());
TbMsg copyMsg = msg.copy(UUIDs.timeBased(), targetRCId, null, DEFAULT_CLUSTER_PARTITION);
@@ -269,6 +280,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private TbMsg enrichWithRuleChainId(TbMsg tbMsg) {
// We don't put firstNodeId because it may change over time;
- return new TbMsg(tbMsg.getId(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(), tbMsg.getData(), entityId, null, 0L);
+ return new TbMsg(tbMsg.getId(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(), tbMsg.getData(), entityId, null, systemContext.getQueuePartitionId());
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
index 054284d..c0a475c 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
@@ -21,6 +21,8 @@ import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg;
+import java.util.Set;
+
/**
* Created by ashvayka on 19.03.18.
*/
@@ -28,7 +30,7 @@ import org.thingsboard.server.common.msg.TbMsg;
final class RuleNodeToRuleChainTellNextMsg implements TbActorMsg {
private final RuleNodeId originator;
- private final String relationType;
+ private final Set<String> relationTypes;
private final TbMsg msg;
@Override
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
index e15eb2f..5a097df 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.actors.service;
import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.service.cluster.discovery.DiscoveryServiceListener;
@@ -27,10 +28,11 @@ public interface ActorService extends SessionMsgProcessor, WebSocketMsgProcessor
void onEntityStateChange(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state);
- void onMsg(ServiceToRuleEngineMsg msg);
+ void onMsg(SendToClusterMsg msg);
void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId);
void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType);
+ void onMsg(ServiceToRuleEngineMsg serviceToRuleEngineMsg);
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
index 1d9c671..3624127 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
@@ -39,8 +39,12 @@ public abstract class ContextAwareActor extends UntypedActor {
logger.debug("Processing msg: {}", msg);
}
if (msg instanceof TbActorMsg) {
- if(!process((TbActorMsg) msg)){
- logger.warning("Unknown message: {}!", msg);
+ try {
+ if (!process((TbActorMsg) msg)) {
+ logger.warning("Unknown message: {}!", msg);
+ }
+ } catch (Exception e) {
+ throw e;
}
} else {
logger.warning("Unknown message: {}!", msg);
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index df0d122..c80e913 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -19,6 +19,7 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Terminated;
+import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -27,25 +28,23 @@ import org.thingsboard.server.actors.app.AppActor;
import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
import org.thingsboard.server.actors.rpc.RpcManagerActor;
import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
-import org.thingsboard.server.actors.rpc.RpcSessionTellMsg;
import org.thingsboard.server.actors.session.SessionManagerActor;
import org.thingsboard.server.actors.stats.StatsActor;
import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg;
-import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.ToPluginActorMsg;
import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
import org.thingsboard.server.extensions.api.plugins.ws.msg.PluginWebsocketMsg;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
import org.thingsboard.server.service.cluster.discovery.ServerInstance;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
@@ -55,7 +54,8 @@ import scala.concurrent.duration.Duration;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import java.util.Optional;
+
+import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_ACTOR_MESSAGE;
@Service
@Slf4j
@@ -127,7 +127,7 @@ public class DefaultActorService implements ActorService {
}
@Override
- public void onMsg(ServiceToRuleEngineMsg msg) {
+ public void onMsg(SendToClusterMsg msg) {
appActor.tell(msg, ActorRef.noSender());
}
@@ -149,53 +149,6 @@ public class DefaultActorService implements ActorService {
appActor.tell(msg, ActorRef.noSender());
}
- @Override
- public void onMsg(ToPluginActorMsg msg) {
- log.trace("Processing plugin rpc msg: {}", msg);
- appActor.tell(msg, ActorRef.noSender());
- }
-
- @Override
- public void onMsg(DeviceToDeviceActorMsg msg) {
- log.trace("Processing device rpc msg: {}", msg);
- appActor.tell(msg, ActorRef.noSender());
- }
-
- @Override
- public void onMsg(ToDeviceActorNotificationMsg msg) {
- log.trace("Processing notification rpc msg: {}", msg);
- appActor.tell(msg, ActorRef.noSender());
- }
-
- @Override
- public void onMsg(ToDeviceSessionActorMsg msg) {
- log.trace("Processing session rpc msg: {}", msg);
- sessionManagerActor.tell(msg, ActorRef.noSender());
- }
-
- @Override
- public void onMsg(ToAllNodesMsg msg) {
- log.trace("Processing broadcast rpc msg: {}", msg);
- appActor.tell(msg, ActorRef.noSender());
- }
-
- @Override
- public void onMsg(RpcSessionCreateRequestMsg msg) {
- log.trace("Processing session create msg: {}", msg);
- rpcManagerActor.tell(msg, ActorRef.noSender());
- }
-
- @Override
- public void onMsg(RpcSessionTellMsg msg) {
- log.trace("Processing session rpc msg: {}", msg);
- rpcManagerActor.tell(msg, ActorRef.noSender());
- }
-
- @Override
- public void onMsg(RpcBroadcastMsg msg) {
- log.trace("Processing broadcast rpc msg: {}", msg);
- rpcManagerActor.tell(msg, ActorRef.noSender());
- }
@Override
public void onServerAdded(ServerInstance server) {
@@ -223,28 +176,29 @@ public class DefaultActorService implements ActorService {
@Override
public void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId) {
DeviceCredentialsUpdateNotificationMsg msg = new DeviceCredentialsUpdateNotificationMsg(tenantId, deviceId);
- Optional<ServerAddress> address = actorContext.getRoutingService().resolveById(deviceId);
- if (address.isPresent()) {
- rpcService.tell(address.get(), msg);
- } else {
- onMsg(msg);
- }
+ appActor.tell(new SendToClusterMsg(deviceId, msg), ActorRef.noSender());
}
@Override
public void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType) {
log.trace("[{}] Processing onDeviceNameOrTypeUpdate event, deviceName: {}, deviceType: {}", deviceId, deviceName, deviceType);
DeviceNameOrTypeUpdateMsg msg = new DeviceNameOrTypeUpdateMsg(tenantId, deviceId, deviceName, deviceType);
- Optional<ServerAddress> address = actorContext.getRoutingService().resolveById(deviceId);
- if (address.isPresent()) {
- rpcService.tell(address.get(), msg);
- } else {
- onMsg(msg);
- }
+ appActor.tell(new SendToClusterMsg(deviceId, msg), ActorRef.noSender());
+ }
+
+ @Override
+ public void onMsg(ServiceToRuleEngineMsg msg) {
+ appActor.tell(msg, ActorRef.noSender());
}
public void broadcast(ToAllNodesMsg msg) {
- rpcService.broadcast(msg);
+ actorContext.getEncodingService().encode(msg);
+ rpcService.broadcast(new RpcBroadcastMsg(ClusterAPIProtos.ClusterMessage
+ .newBuilder()
+ .setPayload(ByteString
+ .copyFrom(actorContext.getEncodingService().encode(msg)))
+ .setMessageType(CLUSTER_ACTOR_MESSAGE)
+ .build()));
appActor.tell(msg, ActorRef.noSender());
}
@@ -253,4 +207,60 @@ public class DefaultActorService implements ActorService {
this.sessionManagerActor.tell(msg, ActorRef.noSender());
this.rpcManagerActor.tell(msg, ActorRef.noSender());
}
+
+ @Override
+ public void onReceivedMsg(ServerAddress source, ClusterAPIProtos.ClusterMessage msg) {
+ ServerAddress serverAddress = new ServerAddress(source.getHost(), source.getPort());
+ log.info("Received msg [{}] from [{}]", msg.getMessageType().name(), serverAddress);
+ if(log.isDebugEnabled()){
+ log.info("MSG: ", msg);
+ }
+ switch (msg.getMessageType()) {
+ case CLUSTER_ACTOR_MESSAGE:
+ java.util.Optional<TbActorMsg> decodedMsg = actorContext.getEncodingService()
+ .decode(msg.getPayload().toByteArray());
+ if (decodedMsg.isPresent()) {
+ appActor.tell(decodedMsg.get(), ActorRef.noSender());
+ } else {
+ log.error("Error during decoding cluster proto message");
+ }
+ break;
+ case TO_ALL_NODES_MSG:
+ //TODO
+ break;
+ case CLUSTER_TELEMETRY_SUBSCRIPTION_CREATE_MESSAGE:
+ actorContext.getTsSubService().onNewRemoteSubscription(serverAddress, msg.getPayload().toByteArray());
+ break;
+ case CLUSTER_TELEMETRY_SUBSCRIPTION_UPDATE_MESSAGE:
+ actorContext.getTsSubService().onRemoteSubscriptionUpdate(serverAddress, msg.getPayload().toByteArray());
+ break;
+ case CLUSTER_TELEMETRY_SUBSCRIPTION_CLOSE_MESSAGE:
+ actorContext.getTsSubService().onRemoteSubscriptionClose(serverAddress, msg.getPayload().toByteArray());
+ break;
+ case CLUSTER_TELEMETRY_SESSION_CLOSE_MESSAGE:
+ actorContext.getTsSubService().onRemoteSessionClose(serverAddress, msg.getPayload().toByteArray());
+ break;
+ case CLUSTER_TELEMETRY_ATTR_UPDATE_MESSAGE:
+ actorContext.getTsSubService().onRemoteAttributesUpdate(serverAddress, msg.getPayload().toByteArray());
+ break;
+ case CLUSTER_TELEMETRY_TS_UPDATE_MESSAGE:
+ actorContext.getTsSubService().onRemoteTsUpdate(serverAddress, msg.getPayload().toByteArray());
+ break;
+ }
+ }
+
+ @Override
+ public void onSendMsg(ClusterAPIProtos.ClusterMessage msg) {
+ rpcManagerActor.tell(msg, ActorRef.noSender());
+ }
+
+ @Override
+ public void onRpcSessionCreateRequestMsg(RpcSessionCreateRequestMsg msg) {
+ rpcManagerActor.tell(msg, ActorRef.noSender());
+ }
+
+ @Override
+ public void onBroadcastMsg(RpcBroadcastMsg msg) {
+ rpcManagerActor.tell(msg, ActorRef.noSender());
+ }
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java
index bbaef5d..42e127e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java
@@ -21,6 +21,7 @@ import org.thingsboard.server.actors.shared.SessionTimeoutMsg;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.device.BasicDeviceToDeviceActorMsg;
import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
@@ -44,7 +45,7 @@ abstract class AbstractSessionActorMsgProcessor extends AbstractContextAwareMsgP
this.sessionId = sessionId;
}
- protected abstract void processToDeviceActorMsg(ActorContext ctx, ToDeviceActorSessionMsg msg);
+ protected abstract void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg);
protected abstract void processTimeoutMsg(ActorContext context, SessionTimeoutMsg msg);
@@ -62,12 +63,12 @@ abstract class AbstractSessionActorMsgProcessor extends AbstractContextAwareMsgP
protected void cleanupSession(ActorContext ctx) {
}
- protected void updateSessionCtx(ToDeviceActorSessionMsg msg, SessionType type) {
+ protected void updateSessionCtx(TransportToDeviceSessionActorMsg msg, SessionType type) {
sessionCtx = msg.getSessionMsg().getSessionContext();
deviceToDeviceActorMsgPrototype = new BasicDeviceToDeviceActorMsg(msg, type);
}
- protected DeviceToDeviceActorMsg toDeviceMsg(ToDeviceActorSessionMsg msg) {
+ protected DeviceToDeviceActorMsg toDeviceMsg(TransportToDeviceSessionActorMsg msg) {
AdaptorToSessionActorMsg adaptorMsg = msg.getSessionMsg();
return new BasicDeviceToDeviceActorMsg(deviceToDeviceActorMsgPrototype, adaptorMsg.getMsg());
}
@@ -86,23 +87,20 @@ abstract class AbstractSessionActorMsgProcessor extends AbstractContextAwareMsgP
return address;
}
- protected Optional<ServerAddress> forwardToAppActorIfAdressChanged(ActorContext ctx, DeviceToDeviceActorMsg toForward, Optional<ServerAddress> oldAddress) {
+ protected Optional<ServerAddress> forwardToAppActorIfAddressChanged(ActorContext ctx, DeviceToDeviceActorMsg toForward, Optional<ServerAddress> oldAddress) {
+
Optional<ServerAddress> newAddress = systemContext.getRoutingService().resolveById(toForward.getDeviceId());
if (!newAddress.equals(oldAddress)) {
- if (newAddress.isPresent()) {
- systemContext.getRpcService().tell(newAddress.get(),
- toForward.toOtherAddress(systemContext.getRoutingService().getCurrentServer()));
- } else {
- getAppActor().tell(toForward, ctx.self());
- }
+ getAppActor().tell(new SendToClusterMsg(toForward.getDeviceId(), toForward
+ .toOtherAddress(systemContext.getRoutingService().getCurrentServer())), ctx.self());
}
return newAddress;
}
protected void forwardToAppActor(ActorContext ctx, DeviceToDeviceActorMsg toForward, Optional<ServerAddress> address) {
if (address.isPresent()) {
- systemContext.getRpcService().tell(address.get(),
- toForward.toOtherAddress(systemContext.getRoutingService().getCurrentServer()));
+ systemContext.getRpcService().tell(systemContext.getEncodingService().convertToProtoDataMessage(address.get(),
+ toForward.toOtherAddress(systemContext.getRoutingService().getCurrentServer())));
} else {
getAppActor().tell(toForward, ctx.self());
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
index fa5287f..83d56e0 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
@@ -46,7 +46,7 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
}
@Override
- protected void processToDeviceActorMsg(ActorContext ctx, ToDeviceActorSessionMsg msg) {
+ protected void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg) {
updateSessionCtx(msg, SessionType.ASYNC);
if (firstMsg) {
toDeviceMsg(new SessionOpenMsg()).ifPresent(m -> forwardToAppActor(ctx, m));
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java b/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java
index 9d324c5..f67d46b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java
@@ -17,7 +17,6 @@ package org.thingsboard.server.actors.session;
import akka.actor.OneForOneStrategy;
import akka.actor.SupervisorStrategy;
-import akka.japi.Function;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
@@ -25,8 +24,8 @@ import org.thingsboard.server.actors.shared.SessionTimeoutMsg;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.session.ToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
import org.thingsboard.server.common.msg.session.SessionMsg;
import org.thingsboard.server.common.msg.session.SessionType;
@@ -63,38 +62,37 @@ public class SessionActor extends ContextAwareActor {
@Override
protected boolean process(TbActorMsg msg) {
- //TODO Move everything here, to work with TbActorMsg
- return false;
- }
-
- @Override
- public void onReceive(Object msg) throws Exception {
- logger.debug("[{}] Processing: {}.", sessionId, msg);
- if (msg instanceof ToDeviceActorSessionMsg) {
- processDeviceMsg((ToDeviceActorSessionMsg) msg);
- } else if (msg instanceof ToDeviceSessionActorMsg) {
- processToDeviceMsg((ToDeviceSessionActorMsg) msg);
- } else if (msg instanceof SessionTimeoutMsg) {
- processTimeoutMsg((SessionTimeoutMsg) msg);
- } else if (msg instanceof SessionCtrlMsg) {
- processSessionCtrlMsg((SessionCtrlMsg) msg);
- } else if (msg instanceof ClusterEventMsg) {
- processClusterEvent((ClusterEventMsg) msg);
- } else {
- logger.warning("[{}] Unknown msg: {}", sessionId, msg);
+ switch (msg.getMsgType()) {
+ case TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG:
+ processTransportToSessionMsg((TransportToDeviceSessionActorMsg) msg);
+ break;
+ case ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG:
+ processActorsToSessionMsg((ActorSystemToDeviceSessionActorMsg) msg);
+ break;
+ case SESSION_TIMEOUT_MSG:
+ processTimeoutMsg((SessionTimeoutMsg) msg);
+ break;
+ case SESSION_CTRL_MSG:
+ processSessionCloseMsg((SessionCtrlMsg) msg);
+ break;
+ case CLUSTER_EVENT_MSG:
+ processClusterEvent((ClusterEventMsg) msg);
+ break;
+ default: return false;
}
+ return true;
}
private void processClusterEvent(ClusterEventMsg msg) {
processor.processClusterEvent(context(), msg);
}
- private void processDeviceMsg(ToDeviceActorSessionMsg msg) {
+ private void processTransportToSessionMsg(TransportToDeviceSessionActorMsg msg) {
initProcessor(msg);
processor.processToDeviceActorMsg(context(), msg);
}
- private void processToDeviceMsg(ToDeviceSessionActorMsg msg) {
+ private void processActorsToSessionMsg(ActorSystemToDeviceSessionActorMsg msg) {
processor.processToDeviceMsg(context(), msg.getMsg());
}
@@ -106,7 +104,7 @@ public class SessionActor extends ContextAwareActor {
}
}
- private void processSessionCtrlMsg(SessionCtrlMsg msg) {
+ private void processSessionCloseMsg(SessionCtrlMsg msg) {
if (processor != null) {
processor.processSessionCtrlMsg(context(), msg);
} else if (msg instanceof SessionCloseMsg) {
@@ -116,7 +114,7 @@ public class SessionActor extends ContextAwareActor {
}
}
- private void initProcessor(ToDeviceActorSessionMsg msg) {
+ private void initProcessor(TransportToDeviceSessionActorMsg msg) {
if (processor == null) {
SessionMsg sessionMsg = (SessionMsg) msg.getSessionMsg();
if (sessionMsg.getSessionContext().getSessionType() == SessionType.SYNC) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java
index b5b1791..1f8bb6d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java
@@ -17,7 +17,6 @@ package org.thingsboard.server.actors.session;
import java.util.HashMap;
import java.util.Map;
-import java.util.UUID;
import akka.actor.*;
import org.thingsboard.server.actors.ActorSystemContext;
@@ -33,8 +32,9 @@ import akka.event.Logging;
import akka.event.LoggingAdapter;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.core.SessionCloseMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
+import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
public class SessionManagerActor extends ContextAwareActor {
@@ -104,7 +104,7 @@ public class SessionManagerActor extends ContextAwareActor {
}
private void forwardToSessionActor(SessionAwareMsg msg) {
- if (msg instanceof ToDeviceSessionActorMsg || msg instanceof SessionCloseMsg) {
+ if (msg instanceof ActorSystemToDeviceSessionActorMsg || msg instanceof SessionCloseMsg) {
String sessionIdStr = msg.getSessionId().toUidStr();
ActorRef sessionActor = sessionActors.get(sessionIdStr);
if (sessionActor != null) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java
index 7f520b5..a0f0a88 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java
@@ -22,7 +22,7 @@ import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
import org.thingsboard.server.common.msg.session.*;
-import org.thingsboard.server.common.msg.session.ToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
import org.thingsboard.server.common.msg.session.ex.SessionException;
@@ -41,7 +41,7 @@ class SyncMsgProcessor extends AbstractSessionActorMsgProcessor {
}
@Override
- protected void processToDeviceActorMsg(ActorContext ctx, ToDeviceActorSessionMsg msg) {
+ protected void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg) {
updateSessionCtx(msg, SessionType.SYNC);
pendingMsg = toDeviceMsg(msg);
pendingResponse = true;
@@ -73,7 +73,7 @@ class SyncMsgProcessor extends AbstractSessionActorMsgProcessor {
@Override
public void processClusterEvent(ActorContext context, ClusterEventMsg msg) {
if (pendingResponse) {
- Optional<ServerAddress> newTargetServer = forwardToAppActorIfAdressChanged(context, pendingMsg, currentTargetServer);
+ Optional<ServerAddress> newTargetServer = forwardToAppActorIfAddressChanged(context, pendingMsg, currentTargetServer);
if (logger.isDebugEnabled()) {
if (!newTargetServer.equals(currentTargetServer)) {
if (newTargetServer.isPresent()) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
index 4e9b8db..771b85b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
@@ -89,7 +89,7 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
protected void putToQueue(final TbMsg tbMsg, final Consumer<TbMsg> onSuccess) {
EntityId entityId = tbMsg.getRuleNodeId() != null ? tbMsg.getRuleNodeId() : tbMsg.getRuleChainId();
- Futures.addCallback(queue.put(this.tenantId, tbMsg, entityId.getId(), 0), new FutureCallback<Void>() {
+ Futures.addCallback(queue.put(this.tenantId, tbMsg, entityId.getId(), tbMsg.getClusterPartition()), new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void result) {
onSuccess.accept(tbMsg);
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/SessionTimeoutMsg.java b/application/src/main/java/org/thingsboard/server/actors/shared/SessionTimeoutMsg.java
index 7d6dbca..d015fe0 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/SessionTimeoutMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/SessionTimeoutMsg.java
@@ -17,13 +17,20 @@ package org.thingsboard.server.actors.shared;
import lombok.Data;
import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
import java.io.Serializable;
@Data
-public class SessionTimeoutMsg implements Serializable {
+public class SessionTimeoutMsg implements Serializable, TbActorMsg {
private static final long serialVersionUID = 1L;
private final SessionId sessionId;
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.SESSION_TIMEOUT_MSG;
+ }
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java
index 03c9694..bc386eb 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java
@@ -20,7 +20,7 @@ import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.gen.discovery.ServerInstanceProtos.ServerInfo;
+import org.thingsboard.server.gen.discovery.ServerInstanceProtos;
/**
* @author Andrew Shvayka
@@ -29,8 +29,6 @@ import org.thingsboard.server.gen.discovery.ServerInstanceProtos.ServerInfo;
@EqualsAndHashCode(exclude = {"serverInfo", "serverAddress"})
public final class ServerInstance implements Comparable<ServerInstance> {
- @Getter(AccessLevel.PACKAGE)
- private final ServerInfo serverInfo;
@Getter
private final String host;
@Getter
@@ -38,8 +36,13 @@ public final class ServerInstance implements Comparable<ServerInstance> {
@Getter
private final ServerAddress serverAddress;
- public ServerInstance(ServerInfo serverInfo) {
- this.serverInfo = serverInfo;
+ public ServerInstance(ServerAddress serverAddress) {
+ this.serverAddress = serverAddress;
+ this.host = serverAddress.getHost();
+ this.port = serverAddress.getPort();
+ }
+
+ public ServerInstance(ServerInstanceProtos.ServerInfo serverInfo) {
this.host = serverInfo.getHost();
this.port = serverInfo.getPort();
this.serverAddress = new ServerAddress(host, port);
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
index 818d2b1..6002b0e 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
@@ -15,8 +15,9 @@
*/
package org.thingsboard.server.service.cluster.discovery;
-import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SerializationUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
@@ -31,15 +32,17 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
+import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
-import org.thingsboard.server.gen.discovery.ServerInstanceProtos.ServerInfo;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import org.thingsboard.server.utils.MiscUtils;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import java.io.IOException;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
@@ -67,6 +70,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
@Autowired
private ServerInstanceService serverInstance;
+ @Autowired
+ @Lazy
+ private TelemetrySubscriptionService tsSubService;
+
private final List<DiscoveryServiceListener> listeners = new CopyOnWriteArrayList<>();
private CuratorFramework client;
@@ -113,7 +120,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
log.info("[{}:{}] Creating ZK node for current instance", self.getHost(), self.getPort());
nodePath = client.create()
.creatingParentsIfNeeded()
- .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", self.getServerInfo().toByteArray());
+ .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress()));
log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath);
} catch (Exception e) {
log.error("Failed to create ZK node", e);
@@ -144,8 +151,8 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
.filter(cd -> !cd.getPath().equals(nodePath))
.map(cd -> {
try {
- return new ServerInstance(ServerInfo.parseFrom(cd.getData()));
- } catch (InvalidProtocolBufferException e) {
+ return new ServerInstance( (ServerAddress) SerializationUtils.deserialize(cd.getData()));
+ } catch (NoSuchElementException e) {
log.error("Failed to decode ZK node", e);
throw new RuntimeException(e);
}
@@ -186,20 +193,23 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
}
ServerInstance instance;
try {
- instance = new ServerInstance(ServerInfo.parseFrom(data.getData()));
- } catch (IOException e) {
+ ServerAddress serverAddress = SerializationUtils.deserialize(data.getData());
+ instance = new ServerInstance(serverAddress);
+ } catch (SerializationException e) {
log.error("Failed to decode server instance for node {}", data.getPath(), e);
throw e;
}
log.info("Processing [{}] event for [{}:{}]", pathChildrenCacheEvent.getType(), instance.getHost(), instance.getPort());
switch (pathChildrenCacheEvent.getType()) {
case CHILD_ADDED:
+ tsSubService.onClusterUpdate();
listeners.forEach(listener -> listener.onServerAdded(instance));
break;
case CHILD_UPDATED:
listeners.forEach(listener -> listener.onServerUpdated(instance));
break;
case CHILD_REMOVED:
+ tsSubService.onClusterUpdate();
listeners.forEach(listener -> listener.onServerRemoved(instance));
break;
default:
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java
index 4067797..8ab6f99 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java
@@ -107,7 +107,7 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService, D
@Override
public void onServerAdded(ServerInstance server) {
- log.debug("On server added event: {}", server);
+ log.info("On server added event: {}", server);
addNode(server);
logCircle();
}
@@ -119,7 +119,7 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService, D
@Override
public void onServerRemoved(ServerInstance server) {
- log.debug("On server removed event: {}", server);
+ log.info("On server removed event: {}", server);
removeNode(server);
logCircle();
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
index 27334c6..9236219 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
@@ -22,29 +22,23 @@ import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import org.springframework.util.SerializationUtils;
import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
-import org.thingsboard.server.actors.rpc.RpcSessionTellMsg;
-import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
-import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
-import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
-import org.thingsboard.server.extensions.api.plugins.msg.ToPluginRpcResponseDeviceMsg;
-import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
+
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
+
import org.thingsboard.server.gen.cluster.ClusterRpcServiceGrpc;
import org.thingsboard.server.service.cluster.discovery.ServerInstance;
import org.thingsboard.server.service.cluster.discovery.ServerInstanceService;
-import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
+import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -58,13 +52,17 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
@Autowired
private ServerInstanceService instanceService;
+ @Autowired
+ private DataDecodingEncodingService encodingService;
+
private RpcMsgListener listener;
private Server server;
private ServerInstance instance;
- private ConcurrentMap<UUID, RpcSessionCreationFuture> pendingSessionMap = new ConcurrentHashMap<>();
+ private ConcurrentMap<UUID, BlockingQueue<StreamObserver<ClusterAPIProtos.ClusterMessage>>> pendingSessionMap =
+ new ConcurrentHashMap<>();
public void init(RpcMsgListener listener) {
this.listener = listener;
@@ -82,11 +80,11 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
}
@Override
- public void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ToRpcServerMessage> msg) {
- RpcSessionCreationFuture future = pendingSessionMap.remove(msgUid);
- if (future != null) {
+ public void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ClusterMessage> inputStream) {
+ BlockingQueue<StreamObserver<ClusterAPIProtos.ClusterMessage>> queue = pendingSessionMap.remove(msgUid);
+ if (queue != null) {
try {
- future.onMsg(msg);
+ queue.put(inputStream);
} catch (InterruptedException e) {
log.warn("Failed to report created session!");
Thread.currentThread().interrupt();
@@ -97,11 +95,13 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
}
@Override
- public StreamObserver<ClusterAPIProtos.ToRpcServerMessage> handlePluginMsgs(StreamObserver<ClusterAPIProtos.ToRpcServerMessage> responseObserver) {
+ public StreamObserver<ClusterAPIProtos.ClusterMessage> handleMsgs(
+ StreamObserver<ClusterAPIProtos.ClusterMessage> responseObserver) {
log.info("Processing new session.");
return createSession(new RpcSessionCreateRequestMsg(UUID.randomUUID(), null, responseObserver));
}
+
@PreDestroy
public void stop() {
if (server != null) {
@@ -117,65 +117,18 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
}
}
- @Override
- public void tell(ServerAddress serverAddress, DeviceToDeviceActorMsg toForward) {
- ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
- .setToDeviceActorRpcMsg(toProtoMsg(toForward)).build();
- tell(serverAddress, msg);
- }
-
- @Override
- public void tell(ServerAddress serverAddress, ToDeviceActorNotificationMsg toForward) {
- ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
- .setToDeviceActorNotificationRpcMsg(toProtoMsg(toForward)).build();
- tell(serverAddress, msg);
- }
@Override
- public void tell(ServerAddress serverAddress, ToDeviceRpcRequestActorMsg toForward) {
- ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
- .setToDeviceRpcRequestRpcMsg(toProtoMsg(toForward)).build();
- tell(serverAddress, msg);
- }
-
- @Override
- public void tell(ServerAddress serverAddress, ToPluginRpcResponseDeviceMsg toForward) {
- ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
- .setToPluginRpcResponseRpcMsg(toProtoMsg(toForward)).build();
- tell(serverAddress, msg);
- }
-
- @Override
- public void tell(ServerAddress serverAddress, ToDeviceSessionActorMsg toForward) {
- ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
- .setToDeviceSessionActorRpcMsg(toProtoMsg(toForward)).build();
- tell(serverAddress, msg);
- }
-
- @Override
- public void tell(PluginRpcMsg toForward) {
- ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
- .setToPluginRpcMsg(toProtoMsg(toForward)).build();
- tell(toForward.getRpcMsg().getServerAddress(), msg);
- }
-
- @Override
- public void broadcast(ToAllNodesMsg toForward) {
- ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
- .setToAllNodesRpcMsg(toProtoMsg(toForward)).build();
- listener.onMsg(new RpcBroadcastMsg(msg));
- }
-
- private void tell(ServerAddress serverAddress, ClusterAPIProtos.ToRpcServerMessage msg) {
- listener.onMsg(new RpcSessionTellMsg(serverAddress, msg));
+ public void broadcast(RpcBroadcastMsg msg) {
+ listener.onBroadcastMsg(msg);
}
- private StreamObserver<ClusterAPIProtos.ToRpcServerMessage> createSession(RpcSessionCreateRequestMsg msg) {
- RpcSessionCreationFuture future = new RpcSessionCreationFuture();
- pendingSessionMap.put(msg.getMsgUid(), future);
- listener.onMsg(msg);
+ private StreamObserver<ClusterAPIProtos.ClusterMessage> createSession(RpcSessionCreateRequestMsg msg) {
+ BlockingQueue<StreamObserver<ClusterAPIProtos.ClusterMessage>> queue = new ArrayBlockingQueue<>(1);
+ pendingSessionMap.put(msg.getMsgUid(), queue);
+ listener.onRpcSessionCreateRequestMsg(msg);
try {
- StreamObserver<ClusterAPIProtos.ToRpcServerMessage> observer = future.get();
+ StreamObserver<ClusterAPIProtos.ClusterMessage> observer = queue.take();
log.info("Processed new session.");
return observer;
} catch (Exception e) {
@@ -184,76 +137,27 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
}
}
- private static ClusterAPIProtos.ToDeviceActorRpcMessage toProtoMsg(DeviceToDeviceActorMsg msg) {
- return ClusterAPIProtos.ToDeviceActorRpcMessage.newBuilder().setData(
- ByteString.copyFrom(SerializationUtils.serialize(msg))
- ).build();
- }
-
- private static ClusterAPIProtos.ToDeviceActorNotificationRpcMessage toProtoMsg(ToDeviceActorNotificationMsg msg) {
- return ClusterAPIProtos.ToDeviceActorNotificationRpcMessage.newBuilder().setData(
- ByteString.copyFrom(SerializationUtils.serialize(msg))
- ).build();
- }
-
- private static ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toProtoMsg(ToDeviceRpcRequestActorMsg msg) {
- ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.Builder builder = ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.newBuilder();
- ToDeviceRpcRequest request = msg.getMsg();
-
- builder.setDeviceTenantId(toUid(msg.getTenantId()));
- builder.setDeviceId(toUid(msg.getDeviceId()));
-
- builder.setMsgId(toUid(request.getId()));
- builder.setOneway(request.isOneway());
- builder.setExpTime(request.getExpirationTime());
- builder.setMethod(request.getBody().getMethod());
- builder.setParams(request.getBody().getParams());
-
- return builder.build();
- }
-
- private static ClusterAPIProtos.ToPluginRpcResponseRpcMessage toProtoMsg(ToPluginRpcResponseDeviceMsg msg) {
- ClusterAPIProtos.ToPluginRpcResponseRpcMessage.Builder builder = ClusterAPIProtos.ToPluginRpcResponseRpcMessage.newBuilder();
- FromDeviceRpcResponse request = msg.getResponse();
-
- builder.setMsgId(toUid(request.getId()));
- request.getResponse().ifPresent(builder::setResponse);
- request.getError().ifPresent(e -> builder.setError(e.name()));
-
- return builder.build();
- }
-
- private ClusterAPIProtos.ToAllNodesRpcMessage toProtoMsg(ToAllNodesMsg msg) {
- return ClusterAPIProtos.ToAllNodesRpcMessage.newBuilder().setData(
- ByteString.copyFrom(SerializationUtils.serialize(msg))
- ).build();
- }
-
-
- private ClusterAPIProtos.ToPluginRpcMessage toProtoMsg(PluginRpcMsg msg) {
- return ClusterAPIProtos.ToPluginRpcMessage.newBuilder()
- .setClazz(msg.getRpcMsg().getMsgClazz())
- .setData(ByteString.copyFrom(msg.getRpcMsg().getMsgData()))
- .setAddress(ClusterAPIProtos.PluginAddress.newBuilder()
- .setTenantId(toUid(msg.getPluginTenantId().getId()))
- .setPluginId(toUid(msg.getPluginId().getId()))
- .build()
- ).build();
- }
-
- private static ClusterAPIProtos.Uid toUid(EntityId uuid) {
- return toUid(uuid.getId());
+ @Override
+ public void tell(ClusterAPIProtos.ClusterMessage message) {
+ listener.onSendMsg(message);
}
- private static ClusterAPIProtos.Uid toUid(UUID uuid) {
- return ClusterAPIProtos.Uid.newBuilder().setPluginUuidMsb(uuid.getMostSignificantBits()).setPluginUuidLsb(
- uuid.getLeastSignificantBits()).build();
+ @Override
+ public void tell(ServerAddress serverAddress, TbActorMsg actorMsg) {
+ listener.onSendMsg(encodingService.convertToProtoDataMessage(serverAddress, actorMsg));
}
- private static ClusterAPIProtos.ToDeviceSessionActorRpcMessage toProtoMsg(ToDeviceSessionActorMsg msg) {
- return ClusterAPIProtos.ToDeviceSessionActorRpcMessage.newBuilder().setData(
- ByteString.copyFrom(SerializationUtils.serialize(msg))
- ).build();
+ @Override
+ public void tell(ServerAddress serverAddress, ClusterAPIProtos.MessageType msgType, byte[] data) {
+ ClusterAPIProtos.ClusterMessage msg = ClusterAPIProtos.ClusterMessage
+ .newBuilder()
+ .setServerAddress(ClusterAPIProtos.ServerAddress
+ .newBuilder()
+ .setHost(serverAddress.getHost())
+ .setPort(serverAddress.getPort())
+ .build())
+ .setMessageType(msgType)
+ .setPayload(ByteString.copyFrom(data)).build();
+ listener.onSendMsg(msg);
}
-
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
index 6aefe46..de29b89 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
@@ -16,15 +16,10 @@
package org.thingsboard.server.service.cluster.rpc;
import io.grpc.stub.StreamObserver;
+import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
-import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.ToPluginRpcResponseDeviceMsg;
-import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
-import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import java.util.UUID;
@@ -35,20 +30,13 @@ public interface ClusterRpcService {
void init(RpcMsgListener listener);
- void tell(ServerAddress serverAddress, DeviceToDeviceActorMsg toForward);
+ void broadcast(RpcBroadcastMsg msg);
- void tell(ServerAddress serverAddress, ToDeviceSessionActorMsg toForward);
+ void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ClusterMessage> inputStream);
- void tell(ServerAddress serverAddress, ToDeviceActorNotificationMsg toForward);
+ void tell(ClusterAPIProtos.ClusterMessage message);
- void tell(ServerAddress serverAddress, ToDeviceRpcRequestActorMsg toForward);
-
- void tell(ServerAddress serverAddress, ToPluginRpcResponseDeviceMsg toForward);
-
- void tell(PluginRpcMsg toForward);
-
- void broadcast(ToAllNodesMsg msg);
-
- void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ToRpcServerMessage> inputStream);
+ void tell(ServerAddress serverAddress, TbActorMsg actorMsg);
+ void tell(ServerAddress serverAddress, ClusterAPIProtos.MessageType msgType, byte[] data);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
index c403895..7216c43 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
@@ -33,8 +33,8 @@ public final class GrpcSession implements Closeable {
private final UUID sessionId;
private final boolean client;
private final GrpcSessionListener listener;
- private StreamObserver<ClusterAPIProtos.ToRpcServerMessage> inputStream;
- private StreamObserver<ClusterAPIProtos.ToRpcServerMessage> outputStream;
+ private StreamObserver<ClusterAPIProtos.ClusterMessage> inputStream;
+ private StreamObserver<ClusterAPIProtos.ClusterMessage> outputStream;
private boolean connected;
private ServerAddress remoteServer;
@@ -56,17 +56,17 @@ public final class GrpcSession implements Closeable {
}
public void initInputStream() {
- this.inputStream = new StreamObserver<ClusterAPIProtos.ToRpcServerMessage>() {
+ this.inputStream = new StreamObserver<ClusterAPIProtos.ClusterMessage>() {
@Override
- public void onNext(ClusterAPIProtos.ToRpcServerMessage msg) {
- if (!connected && msg.hasConnectMsg()) {
+ public void onNext(ClusterAPIProtos.ClusterMessage clusterMessage) {
+ if (!connected && clusterMessage.getMessageType() == ClusterAPIProtos.MessageType.CONNECT_RPC_MESSAGE) {
connected = true;
- ClusterAPIProtos.ServerAddress rpcAddress = msg.getConnectMsg().getServerAddress();
+ ServerAddress rpcAddress = new ServerAddress(clusterMessage.getServerAddress().getHost(), clusterMessage.getServerAddress().getPort());
remoteServer = new ServerAddress(rpcAddress.getHost(), rpcAddress.getPort());
listener.onConnected(GrpcSession.this);
}
if (connected) {
- handleToRpcServerMessage(msg);
+ listener.onReceiveClusterGrpcMsg(GrpcSession.this, clusterMessage);
}
}
@@ -83,37 +83,13 @@ public final class GrpcSession implements Closeable {
};
}
- private void handleToRpcServerMessage(ClusterAPIProtos.ToRpcServerMessage msg) {
- if (msg.hasToPluginRpcMsg()) {
- listener.onToPluginRpcMsg(GrpcSession.this, msg.getToPluginRpcMsg());
- }
- if (msg.hasToDeviceActorRpcMsg()) {
- listener.onToDeviceActorRpcMsg(GrpcSession.this, msg.getToDeviceActorRpcMsg());
- }
- if (msg.hasToDeviceSessionActorRpcMsg()) {
- listener.onToDeviceSessionActorRpcMsg(GrpcSession.this, msg.getToDeviceSessionActorRpcMsg());
- }
- if (msg.hasToDeviceActorNotificationRpcMsg()) {
- listener.onToDeviceActorNotificationRpcMsg(GrpcSession.this, msg.getToDeviceActorNotificationRpcMsg());
- }
- if (msg.hasToDeviceRpcRequestRpcMsg()) {
- listener.onToDeviceRpcRequestRpcMsg(GrpcSession.this, msg.getToDeviceRpcRequestRpcMsg());
- }
- if (msg.hasToPluginRpcResponseRpcMsg()) {
- listener.onFromDeviceRpcResponseRpcMsg(GrpcSession.this, msg.getToPluginRpcResponseRpcMsg());
- }
- if (msg.hasToAllNodesRpcMsg()) {
- listener.onToAllNodesRpcMessage(GrpcSession.this, msg.getToAllNodesRpcMsg());
- }
- }
-
public void initOutputStream() {
if (client) {
listener.onConnected(GrpcSession.this);
}
}
- public void sendMsg(ClusterAPIProtos.ToRpcServerMessage msg) {
+ public void sendMsg(ClusterAPIProtos.ClusterMessage msg) {
outputStream.onNext(msg);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSessionListener.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSessionListener.java
index 44e0693..266b1f5 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSessionListener.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSessionListener.java
@@ -26,20 +26,7 @@ public interface GrpcSessionListener {
void onDisconnected(GrpcSession session);
- void onToPluginRpcMsg(GrpcSession session, ClusterAPIProtos.ToPluginRpcMessage msg);
-
- void onToDeviceActorRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceActorRpcMessage msg);
-
- void onToDeviceActorNotificationRpcMsg(GrpcSession grpcSession, ClusterAPIProtos.ToDeviceActorNotificationRpcMessage msg);
-
- void onToDeviceSessionActorRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceSessionActorRpcMessage msg);
-
- void onToAllNodesRpcMessage(GrpcSession grpcSession, ClusterAPIProtos.ToAllNodesRpcMessage toAllNodesRpcMessage);
-
- void onToDeviceRpcRequestRpcMsg(GrpcSession grpcSession, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toDeviceRpcRequestRpcMsg);
-
- void onFromDeviceRpcResponseRpcMsg(GrpcSession grpcSession, ClusterAPIProtos.ToPluginRpcResponseRpcMessage toPluginRpcResponseRpcMsg);
+ void onReceiveClusterGrpcMsg(GrpcSession session, ClusterAPIProtos.ClusterMessage clusterMessage);
void onError(GrpcSession session, Throwable t);
-
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcMsgListener.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcMsgListener.java
index 5d26fae..33f3847 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcMsgListener.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcMsgListener.java
@@ -17,32 +17,16 @@ package org.thingsboard.server.service.cluster.rpc;
import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
-import org.thingsboard.server.actors.rpc.RpcSessionTellMsg;
-import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
-import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.ToPluginActorMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
/**
* @author Andrew Shvayka
*/
-public interface RpcMsgListener {
-
- void onMsg(DeviceToDeviceActorMsg msg);
-
- void onMsg(ToDeviceActorNotificationMsg msg);
-
- void onMsg(ToDeviceSessionActorMsg msg);
-
- void onMsg(ToAllNodesMsg nodeMsg);
-
- void onMsg(ToPluginActorMsg msg);
-
- void onMsg(RpcSessionCreateRequestMsg msg);
-
- void onMsg(RpcSessionTellMsg rpcSessionTellMsg);
-
- void onMsg(RpcBroadcastMsg rpcBroadcastMsg);
+public interface RpcMsgListener {
+ void onReceivedMsg(ServerAddress remoteServer, ClusterAPIProtos.ClusterMessage msg);
+ void onSendMsg(ClusterAPIProtos.ClusterMessage msg);
+ void onRpcSessionCreateRequestMsg(RpcSessionCreateRequestMsg msg);
+ void onBroadcastMsg(RpcBroadcastMsg msg);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/encoding/DataDecodingEncodingService.java b/application/src/main/java/org/thingsboard/server/service/encoding/DataDecodingEncodingService.java
new file mode 100644
index 0000000..248e9f3
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/encoding/DataDecodingEncodingService.java
@@ -0,0 +1,34 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.encoding;
+
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
+
+import java.util.Optional;
+
+public interface DataDecodingEncodingService {
+
+ Optional<TbActorMsg> decode(byte[] byteArray);
+
+ byte[] encode(TbActorMsg msq);
+
+ ClusterAPIProtos.ClusterMessage convertToProtoDataMessage(ServerAddress serverAddress,
+ TbActorMsg msg);
+
+}
+
diff --git a/application/src/main/java/org/thingsboard/server/service/encoding/ProtoWithJavaSerializationDecodingEncodingService.java b/application/src/main/java/org/thingsboard/server/service/encoding/ProtoWithJavaSerializationDecodingEncodingService.java
new file mode 100644
index 0000000..2cf9299
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/encoding/ProtoWithJavaSerializationDecodingEncodingService.java
@@ -0,0 +1,67 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.encoding;
+
+import com.google.protobuf.ByteString;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.util.SerializationUtils;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
+
+import java.util.Optional;
+
+import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_ACTOR_MESSAGE;
+
+
+@Slf4j
+@Service
+public class ProtoWithJavaSerializationDecodingEncodingService implements DataDecodingEncodingService {
+
+
+ @Override
+ public Optional<TbActorMsg> decode(byte[] byteArray) {
+ try {
+ TbActorMsg msg = (TbActorMsg) SerializationUtils.deserialize(byteArray);
+ return Optional.of(msg);
+
+ } catch (IllegalArgumentException e) {
+ log.error("Error during deserialization message, [{}]", e.getMessage());
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public byte[] encode(TbActorMsg msq) {
+ return SerializationUtils.serialize(msq);
+ }
+
+ @Override
+ public ClusterAPIProtos.ClusterMessage convertToProtoDataMessage(ServerAddress serverAddress,
+ TbActorMsg msg) {
+ return ClusterAPIProtos.ClusterMessage
+ .newBuilder()
+ .setServerAddress(ClusterAPIProtos.ServerAddress
+ .newBuilder()
+ .setHost(serverAddress.getHost())
+ .setPort(serverAddress.getPort())
+ .build())
+ .setMessageType(CLUSTER_ACTOR_MESSAGE)
+ .setPayload(ByteString.copyFrom(encode(msg))).build();
+
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
index 9674490..6a1a69a 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
@@ -30,6 +30,8 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UUIDBased;
import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
@@ -38,6 +40,7 @@ import org.thingsboard.server.dao.audit.AuditLogService;
import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.security.model.SecurityUser;
@@ -135,23 +138,16 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
@Override
public void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body) {
ToServerRpcResponseActorMsg rpcMsg = new ToServerRpcResponseActorMsg(tenantId, deviceId, new ToServerRpcResponseMsg(requestId, body));
- forward(deviceId, rpcMsg, rpcService::tell);
+ forward(deviceId, rpcMsg);
}
private void sendRpcRequest(ToDeviceRpcRequest msg) {
log.trace("[{}] Forwarding msg {} to device actor!", msg.getDeviceId(), msg);
ToDeviceRpcRequestActorMsg rpcMsg = new ToDeviceRpcRequestActorMsg(msg);
- forward(msg.getDeviceId(), rpcMsg, rpcService::tell);
+ forward(msg.getDeviceId(), rpcMsg);
}
- private <T extends ToDeviceActorNotificationMsg> void forward(DeviceId deviceId, T msg, BiConsumer<ServerAddress, T> rpcFunction) {
- Optional<ServerAddress> instance = routingService.resolveById(deviceId);
- if (instance.isPresent()) {
- log.trace("[{}] Forwarding msg {} to remote device actor!", msg.getTenantId(), msg);
- rpcFunction.accept(instance.get(), msg);
- } else {
- log.trace("[{}] Forwarding msg {} to local device actor!", msg.getTenantId(), msg);
- actorService.onMsg(msg);
- }
+ private <T extends ToDeviceActorNotificationMsg> void forward(DeviceId deviceId, T msg) {
+ actorService.onMsg(new SendToClusterMsg(deviceId, msg));
}
}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
index 00a337a..f3cd32c 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
@@ -18,31 +18,42 @@ package org.thingsboard.server.service.telemetry;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
+import org.thingsboard.rule.engine.DonAsynchron;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
+import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
+import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
+import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvQuery;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
+import org.thingsboard.server.extensions.api.plugins.PluginContext;
import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
+import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionErrorCode;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
+import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.state.DefaultDeviceStateService;
import org.thingsboard.server.service.state.DeviceStateService;
@@ -53,15 +64,18 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
/**
* Created by ashvayka on 27.03.18.
@@ -83,6 +97,9 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
private ClusterRoutingService routingService;
@Autowired
+ private ClusterRpcService rpcService;
+
+ @Autowired
@Lazy
private DeviceStateService stateService;
@@ -106,7 +123,6 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
}
private final Map<EntityId, Set<Subscription>> subscriptionsByEntityId = new HashMap<>();
-
private final Map<String, Map<Integer, Subscription>> subscriptionsByWsSessionId = new HashMap<>();
@Override
@@ -117,7 +133,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
ServerAddress address = server.get();
log.trace("[{}] Forwarding subscription [{}] for device [{}] to [{}]", sessionId, sub.getSubscriptionId(), entityId, address);
subscription = new Subscription(sub, true, address);
-// rpcHandler.onNewSubscription(ctx, address, sessionId, subscription);
+ tellNewSubscription(address, sessionId, subscription);
} else {
log.trace("[{}] Registering local subscription [{}] for device [{}]", sessionId, sub.getSubscriptionId(), entityId);
subscription = new Subscription(sub, true);
@@ -189,6 +205,174 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
, System.currentTimeMillis())), callback);
}
+ @Override
+ public void onNewRemoteSubscription(ServerAddress serverAddress, byte[] data) {
+ ClusterAPIProtos.SubscriptionProto proto;
+ try {
+ proto = ClusterAPIProtos.SubscriptionProto.parseFrom(data);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ Map<String, Long> statesMap = proto.getKeyStatesList().stream().collect(
+ Collectors.toMap(ClusterAPIProtos.SubscriptionKetStateProto::getKey, ClusterAPIProtos.SubscriptionKetStateProto::getTs));
+ Subscription subscription = new Subscription(
+ new SubscriptionState(proto.getSessionId(), proto.getSubscriptionId(),
+ EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()),
+ TelemetryFeature.valueOf(proto.getType()), proto.getAllKeys(), statesMap, proto.getScope()),
+ false, new ServerAddress(serverAddress.getHost(), serverAddress.getPort()));
+
+ addRemoteWsSubscription(serverAddress, proto.getSessionId(), subscription);
+ }
+
+ @Override
+ public void onRemoteSubscriptionUpdate(ServerAddress serverAddress, byte[] data) {
+ ClusterAPIProtos.SubscriptionUpdateProto proto;
+ try {
+ proto = ClusterAPIProtos.SubscriptionUpdateProto.parseFrom(data);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ SubscriptionUpdate update = convert(proto);
+ String sessionId = proto.getSessionId();
+ log.trace("[{}] Processing remote subscription onUpdate [{}]", sessionId, update);
+ Optional<Subscription> subOpt = getSubscription(sessionId, update.getSubscriptionId());
+ if (subOpt.isPresent()) {
+ updateSubscriptionState(sessionId, subOpt.get(), update);
+ wsService.sendWsMsg(sessionId, update);
+ }
+ }
+
+ @Override
+ public void onRemoteSubscriptionClose(ServerAddress serverAddress, byte[] data) {
+ ClusterAPIProtos.SubscriptionCloseProto proto;
+ try {
+ proto = ClusterAPIProtos.SubscriptionCloseProto.parseFrom(data);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ removeSubscription(proto.getSessionId(), proto.getSubscriptionId());
+ }
+
+ @Override
+ public void onRemoteSessionClose(ServerAddress serverAddress, byte[] data) {
+ ClusterAPIProtos.SessionCloseProto proto;
+ try {
+ proto = ClusterAPIProtos.SessionCloseProto.parseFrom(data);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ cleanupRemoteWsSessionSubscriptions(proto.getSessionId());
+ }
+
+ @Override
+ public void onRemoteAttributesUpdate(ServerAddress serverAddress, byte[] data) {
+ ClusterAPIProtos.AttributeUpdateProto proto;
+ try {
+ proto = ClusterAPIProtos.AttributeUpdateProto.parseFrom(data);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ onAttributesUpdate(EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()), proto.getScope(),
+ proto.getDataList().stream().map(this::toAttribute).collect(Collectors.toList()));
+ }
+
+ @Override
+ public void onRemoteTsUpdate(ServerAddress serverAddress, byte[] data) {
+ ClusterAPIProtos.TimeseriesUpdateProto proto;
+ try {
+ proto = ClusterAPIProtos.TimeseriesUpdateProto.parseFrom(data);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ onTimeseriesUpdate(EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()),
+ proto.getDataList().stream().map(this::toTimeseries).collect(Collectors.toList()));
+ }
+
+ @Override
+ public void onClusterUpdate() {
+ log.trace("Processing cluster onUpdate msg!");
+ Iterator<Map.Entry<EntityId, Set<Subscription>>> deviceIterator = subscriptionsByEntityId.entrySet().iterator();
+ while (deviceIterator.hasNext()) {
+ Map.Entry<EntityId, Set<Subscription>> e = deviceIterator.next();
+ Set<Subscription> subscriptions = e.getValue();
+ Optional<ServerAddress> newAddressOptional = routingService.resolveById(e.getKey());
+ if (newAddressOptional.isPresent()) {
+ newAddressOptional.ifPresent(serverAddress -> checkSubsciptionsNewAddress(serverAddress, subscriptions));
+ } else {
+ checkSubsciptionsPrevAddress(subscriptions);
+ }
+ if (subscriptions.size() == 0) {
+ log.trace("[{}] No more subscriptions for this device on current server.", e.getKey());
+ deviceIterator.remove();
+ }
+ }
+ }
+
+ private void checkSubsciptionsNewAddress(ServerAddress newAddress, Set<Subscription> subscriptions) {
+ Iterator<Subscription> subscriptionIterator = subscriptions.iterator();
+ while (subscriptionIterator.hasNext()) {
+ Subscription s = subscriptionIterator.next();
+ if (s.isLocal()) {
+ if (!newAddress.equals(s.getServer())) {
+ log.trace("[{}] Local subscription is now handled on new server [{}]", s.getWsSessionId(), newAddress);
+ s.setServer(newAddress);
+ tellNewSubscription(newAddress, s.getWsSessionId(), s);
+ }
+ } else {
+ log.trace("[{}] Remote subscription is now handled on new server address: [{}]", s.getWsSessionId(), newAddress);
+ subscriptionIterator.remove();
+ //TODO: onUpdate state of subscription by WsSessionId and other maps.
+ }
+ }
+ }
+
+ private void checkSubsciptionsPrevAddress(Set<Subscription> subscriptions) {
+ for (Subscription s : subscriptions) {
+ if (s.isLocal() && s.getServer() != null) {
+ log.trace("[{}] Local subscription is no longer handled on remote server address [{}]", s.getWsSessionId(), s.getServer());
+ s.setServer(null);
+ } else {
+ log.trace("[{}] Remote subscription is on up to date server address.", s.getWsSessionId());
+ }
+ }
+ }
+
+ private void addRemoteWsSubscription(ServerAddress address, String sessionId, Subscription subscription) {
+ EntityId entityId = subscription.getEntityId();
+ log.trace("[{}] Registering remote subscription [{}] for device [{}] to [{}]", sessionId, subscription.getSubscriptionId(), entityId, address);
+ registerSubscription(sessionId, entityId, subscription);
+ if (subscription.getType() == TelemetryFeature.ATTRIBUTES) {
+ final Map<String, Long> keyStates = subscription.getKeyStates();
+ DonAsynchron.withCallback(attrService.find(entityId, DataConstants.CLIENT_SCOPE, keyStates.keySet()), values -> {
+ List<TsKvEntry> missedUpdates = new ArrayList<>();
+ values.forEach(latestEntry -> {
+ if (latestEntry.getLastUpdateTs() > keyStates.get(latestEntry.getKey())) {
+ missedUpdates.add(new BasicTsKvEntry(latestEntry.getLastUpdateTs(), latestEntry));
+ }
+ });
+ if (!missedUpdates.isEmpty()) {
+ tellRemoteSubUpdate(address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
+ }
+ },
+ e -> log.error("Failed to fetch missed updates.", e), tsCallBackExecutor);
+ } else if (subscription.getType() == TelemetryFeature.TIMESERIES) {
+ long curTs = System.currentTimeMillis();
+ List<TsKvQuery> queries = new ArrayList<>();
+ subscription.getKeyStates().entrySet().forEach(e -> {
+ queries.add(new BaseTsKvQuery(e.getKey(), e.getValue() + 1L, curTs));
+ });
+
+ DonAsynchron.withCallback(tsService.findAll(entityId, queries),
+ missedUpdates -> {
+ if (!missedUpdates.isEmpty()) {
+ tellRemoteSubUpdate(address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
+ }
+ },
+ e -> log.error("Failed to fetch missed updates.", e),
+ tsCallBackExecutor);
+ }
+ }
+
private void onAttributesUpdate(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
Optional<ServerAddress> serverAddress = routingService.resolveById(entityId);
if (!serverAddress.isPresent()) {
@@ -201,7 +385,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
}
}
} else {
-// rpcHandler.onAttributesUpdate(ctx, serverAddress.get(), entityId, entries);
+ tellRemoteAttributesUpdate(serverAddress.get(), entityId, scope, attributes);
}
}
@@ -210,7 +394,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
if (!serverAddress.isPresent()) {
onLocalTimeseriesUpdate(entityId, ts);
} else {
-// rpcHandler.onTimeseriesUpdate(ctx, serverAddress.get(), entityId, entries);
+ tellRemoteTimeseriesUpdate(serverAddress.get(), entityId, ts);
}
}
@@ -256,8 +440,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
updateSubscriptionState(sessionId, s, update);
wsService.sendWsMsg(sessionId, update);
} else {
- //TODO: ashvayka
-// rpcHandler.onSubscriptionUpdate(ctx, s.getServer(), sessionId, update);
+ tellRemoteSubUpdate(s.getServer(), sessionId, update);
}
}
});
@@ -278,11 +461,11 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
sessionSubscriptions.put(subscription.getSubscriptionId(), subscription);
}
- public void cleanupLocalWsSessionSubscriptions(String sessionId) {
+ private void cleanupLocalWsSessionSubscriptions(String sessionId) {
cleanupWsSessionSubscriptions(sessionId, true);
}
- public void cleanupRemoteWsSessionSubscriptions(String sessionId) {
+ private void cleanupRemoteWsSessionSubscriptions(String sessionId) {
cleanupWsSessionSubscriptions(sessionId, false);
}
@@ -320,14 +503,14 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
}
for (ServerAddress address : affectedServers) {
log.debug("[{}] Going to onSubscriptionUpdate [{}] server about session close event", sessionId, address);
-// rpcHandler.onSessionClose(ctx, address, sessionId);
+ tellRemoteSessionClose(address, sessionId);
}
}
private void processSubscriptionRemoval(String sessionId, Map<Integer, Subscription> sessionSubscriptions, Subscription subscription) {
EntityId entityId = subscription.getEntityId();
if (subscription.isLocal() && subscription.getServer() != null) {
-// rpcHandler.onSubscriptionClose(ctx, subscription.getServer(), sessionId, subscription.getSubscriptionId());
+ tellRemoteSubClose(subscription.getServer(), sessionId, subscription.getSubscriptionId());
}
if (sessionSubscriptions.isEmpty()) {
log.debug("[{}] Removed last subscription for particular session.", sessionId);
@@ -379,4 +562,151 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
}
}, wsCallBackExecutor);
}
+
+ private void tellNewSubscription(ServerAddress address, String sessionId, Subscription sub) {
+ ClusterAPIProtos.SubscriptionProto.Builder builder = ClusterAPIProtos.SubscriptionProto.newBuilder();
+ builder.setSessionId(sessionId);
+ builder.setSubscriptionId(sub.getSubscriptionId());
+ builder.setEntityType(sub.getEntityId().getEntityType().name());
+ builder.setEntityId(sub.getEntityId().getId().toString());
+ builder.setType(sub.getType().name());
+ builder.setAllKeys(sub.isAllKeys());
+ builder.setScope(sub.getScope());
+ sub.getKeyStates().entrySet().forEach(e -> builder.addKeyStates(
+ ClusterAPIProtos.SubscriptionKetStateProto.newBuilder().setKey(e.getKey()).setTs(e.getValue()).build()));
+ rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_SUBSCRIPTION_CREATE_MESSAGE, builder.build().toByteArray());
+ }
+
+ private void tellRemoteSubUpdate(ServerAddress address, String sessionId, SubscriptionUpdate update) {
+ ClusterAPIProtos.SubscriptionUpdateProto.Builder builder = ClusterAPIProtos.SubscriptionUpdateProto.newBuilder();
+ builder.setSessionId(sessionId);
+ builder.setSubscriptionId(update.getSubscriptionId());
+ builder.setErrorCode(update.getErrorCode());
+ if (update.getErrorMsg() != null) {
+ builder.setErrorMsg(update.getErrorMsg());
+ }
+ update.getData().entrySet().forEach(
+ e -> {
+ ClusterAPIProtos.SubscriptionUpdateValueListProto.Builder dataBuilder = ClusterAPIProtos.SubscriptionUpdateValueListProto.newBuilder();
+
+ dataBuilder.setKey(e.getKey());
+ e.getValue().forEach(v -> {
+ Object[] array = (Object[]) v;
+ dataBuilder.addTs((long) array[0]);
+ dataBuilder.addValue((String) array[1]);
+ });
+
+ builder.addData(dataBuilder.build());
+ }
+ );
+ rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_SUBSCRIPTION_UPDATE_MESSAGE, builder.build().toByteArray());
+ }
+
+ private void tellRemoteAttributesUpdate(ServerAddress address, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
+ ClusterAPIProtos.AttributeUpdateProto.Builder builder = ClusterAPIProtos.AttributeUpdateProto.newBuilder();
+ builder.setEntityId(entityId.getId().toString());
+ builder.setEntityType(entityId.getEntityType().name());
+ builder.setScope(scope);
+ attributes.forEach(v -> builder.addData(toKeyValueProto(v.getLastUpdateTs(), v).build()));
+ rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_ATTR_UPDATE_MESSAGE, builder.build().toByteArray());
+ }
+
+ private void tellRemoteTimeseriesUpdate(ServerAddress address, EntityId entityId, List<TsKvEntry> ts) {
+ ClusterAPIProtos.TimeseriesUpdateProto.Builder builder = ClusterAPIProtos.TimeseriesUpdateProto.newBuilder();
+ builder.setEntityId(entityId.getId().toString());
+ builder.setEntityType(entityId.getEntityType().name());
+ ts.forEach(v -> builder.addData(toKeyValueProto(v.getTs(), v).build()));
+ rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_TS_UPDATE_MESSAGE, builder.build().toByteArray());
+ }
+
+ private void tellRemoteSessionClose(ServerAddress address, String sessionId) {
+ ClusterAPIProtos.SessionCloseProto proto = ClusterAPIProtos.SessionCloseProto.newBuilder().setSessionId(sessionId).build();
+ rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_SESSION_CLOSE_MESSAGE, proto.toByteArray());
+ }
+
+ private void tellRemoteSubClose(ServerAddress address, String sessionId, int subscriptionId) {
+ ClusterAPIProtos.SubscriptionCloseProto proto = ClusterAPIProtos.SubscriptionCloseProto.newBuilder().setSessionId(sessionId).setSubscriptionId(subscriptionId).build();
+ rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_SUBSCRIPTION_CLOSE_MESSAGE, proto.toByteArray());
+ }
+
+ private ClusterAPIProtos.KeyValueProto.Builder toKeyValueProto(long ts, KvEntry attr) {
+ ClusterAPIProtos.KeyValueProto.Builder dataBuilder = ClusterAPIProtos.KeyValueProto.newBuilder();
+ dataBuilder.setKey(attr.getKey());
+ dataBuilder.setTs(ts);
+ dataBuilder.setValueType(attr.getDataType().ordinal());
+ switch (attr.getDataType()) {
+ case BOOLEAN:
+ Optional<Boolean> booleanValue = attr.getBooleanValue();
+ booleanValue.ifPresent(dataBuilder::setBoolValue);
+ break;
+ case LONG:
+ Optional<Long> longValue = attr.getLongValue();
+ longValue.ifPresent(dataBuilder::setLongValue);
+ break;
+ case DOUBLE:
+ Optional<Double> doubleValue = attr.getDoubleValue();
+ doubleValue.ifPresent(dataBuilder::setDoubleValue);
+ break;
+ case STRING:
+ Optional<String> stringValue = attr.getStrValue();
+ stringValue.ifPresent(dataBuilder::setStrValue);
+ break;
+ }
+ return dataBuilder;
+ }
+
+ private AttributeKvEntry toAttribute(ClusterAPIProtos.KeyValueProto proto) {
+ return new BaseAttributeKvEntry(getKvEntry(proto), proto.getTs());
+ }
+
+ private TsKvEntry toTimeseries(ClusterAPIProtos.KeyValueProto proto) {
+ return new BasicTsKvEntry(proto.getTs(), getKvEntry(proto));
+ }
+
+ private KvEntry getKvEntry(ClusterAPIProtos.KeyValueProto proto) {
+ KvEntry entry = null;
+ DataType type = DataType.values()[proto.getValueType()];
+ switch (type) {
+ case BOOLEAN:
+ entry = new BooleanDataEntry(proto.getKey(), proto.getBoolValue());
+ break;
+ case LONG:
+ entry = new LongDataEntry(proto.getKey(), proto.getLongValue());
+ break;
+ case DOUBLE:
+ entry = new DoubleDataEntry(proto.getKey(), proto.getDoubleValue());
+ break;
+ case STRING:
+ entry = new StringDataEntry(proto.getKey(), proto.getStrValue());
+ break;
+ }
+ return entry;
+ }
+
+ private SubscriptionUpdate convert(ClusterAPIProtos.SubscriptionUpdateProto proto) {
+ if (proto.getErrorCode() > 0) {
+ return new SubscriptionUpdate(proto.getSubscriptionId(), SubscriptionErrorCode.forCode(proto.getErrorCode()), proto.getErrorMsg());
+ } else {
+ Map<String, List<Object>> data = new TreeMap<>();
+ proto.getDataList().forEach(v -> {
+ List<Object> values = data.computeIfAbsent(v.getKey(), k -> new ArrayList<>());
+ for (int i = 0; i < v.getTsCount(); i++) {
+ Object[] value = new Object[2];
+ value[0] = v.getTs(i);
+ value[1] = v.getValue(i);
+ values.add(value);
+ }
+ });
+ return new SubscriptionUpdate(proto.getSubscriptionId(), data);
+ }
+ }
+
+ private Optional<Subscription> getSubscription(String sessionId, int subscriptionId) {
+ Subscription state = null;
+ Map<Integer, Subscription> subMap = subscriptionsByWsSessionId.get(sessionId);
+ if (subMap != null) {
+ state = subMap.get(subscriptionId);
+ }
+ return Optional.ofNullable(state);
+ }
}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
index 57f3876..3ea0cee 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
@@ -21,7 +21,7 @@ import com.google.common.base.Function;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.hazelcast.util.function.Consumer;
+import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
index 923d06b..964c494 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
@@ -17,7 +17,10 @@ package org.thingsboard.server.service.telemetry;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.extensions.api.plugins.PluginContext;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
/**
* Created by ashvayka on 27.03.18.
@@ -30,4 +33,17 @@ public interface TelemetrySubscriptionService extends RuleEngineTelemetryService
void removeSubscription(String sessionId, int cmdId);
+ void onNewRemoteSubscription(ServerAddress serverAddress, byte[] data);
+
+ void onRemoteSubscriptionUpdate(ServerAddress serverAddress, byte[] bytes);
+
+ void onRemoteSubscriptionClose(ServerAddress serverAddress, byte[] bytes);
+
+ void onRemoteSessionClose(ServerAddress serverAddress, byte[] bytes);
+
+ void onRemoteAttributesUpdate(ServerAddress serverAddress, byte[] bytes);
+
+ void onRemoteTsUpdate(ServerAddress serverAddress, byte[] bytes);
+
+ void onClusterUpdate();
}
application/src/main/proto/cluster.proto 121(+75 -46)
diff --git a/application/src/main/proto/cluster.proto b/application/src/main/proto/cluster.proto
index 3bb0db7..90917e1 100644
--- a/application/src/main/proto/cluster.proto
+++ b/application/src/main/proto/cluster.proto
@@ -19,76 +19,105 @@ package cluster;
option java_package = "org.thingsboard.server.gen.cluster";
option java_outer_classname = "ClusterAPIProtos";
+service ClusterRpcService {
+ rpc handleMsgs(stream ClusterMessage) returns (stream ClusterMessage) {}
+}
+message ClusterMessage {
+ MessageType messageType = 1;
+ MessageMataInfo messageMetaInfo = 2;
+ ServerAddress serverAddress = 3;
+ bytes payload = 4;
+}
+
message ServerAddress {
string host = 1;
int32 port = 2;
}
-message Uid {
- sint64 pluginUuidMsb = 1;
- sint64 pluginUuidLsb = 2;
+message MessageMataInfo {
+ string payloadMetaInfo = 1;
+ repeated string tags = 2;
}
-message PluginAddress {
- Uid pluginId = 1;
- Uid tenantId = 2;
+enum MessageType {
+
+ //Cluster control messages
+ RPC_SESSION_CREATE_REQUEST_MSG = 0;
+ TO_ALL_NODES_MSG = 1;
+ RPC_SESSION_TELL_MSG = 2;
+ RPC_BROADCAST_MSG = 3;
+ CONNECT_RPC_MESSAGE =4;
+
+ CLUSTER_ACTOR_MESSAGE = 5;
+ // Messages related to TelemetrySubscriptionService
+ CLUSTER_TELEMETRY_SUBSCRIPTION_CREATE_MESSAGE = 6;
+ CLUSTER_TELEMETRY_SUBSCRIPTION_UPDATE_MESSAGE = 7;
+ CLUSTER_TELEMETRY_SUBSCRIPTION_CLOSE_MESSAGE = 8;
+ CLUSTER_TELEMETRY_SESSION_CLOSE_MESSAGE = 9;
+ CLUSTER_TELEMETRY_ATTR_UPDATE_MESSAGE = 10;
+ CLUSTER_TELEMETRY_TS_UPDATE_MESSAGE = 11;
}
-message ToPluginRpcMessage {
- PluginAddress address = 1;
- int32 clazz = 2;
- bytes data = 3;
+// Messages related to CLUSTER_TELEMETRY_MESSAGE
+message SubscriptionProto {
+ string sessionId = 1;
+ int32 subscriptionId = 2;
+ string entityType = 3;
+ string entityId = 4;
+ string type = 5;
+ bool allKeys = 6;
+ repeated SubscriptionKetStateProto keyStates = 7;
+ string scope = 8;
}
-message ToDeviceActorRpcMessage {
- bytes data = 1;
+message SubscriptionUpdateProto {
+ string sessionId = 1;
+ int32 subscriptionId = 2;
+ int32 errorCode = 3;
+ string errorMsg = 4;
+ repeated SubscriptionUpdateValueListProto data = 5;
}
-message ToDeviceSessionActorRpcMessage {
- bytes data = 1;
+message AttributeUpdateProto {
+ string entityType = 1;
+ string entityId = 2;
+ string scope = 3;
+ repeated KeyValueProto data = 4;
}
-message ToDeviceActorNotificationRpcMessage {
- bytes data = 1;
+message TimeseriesUpdateProto {
+ string entityType = 1;
+ string entityId = 2;
+ repeated KeyValueProto data = 4;
}
-message ToAllNodesRpcMessage {
- bytes data = 1;
+message SessionCloseProto {
+ string sessionId = 1;
}
-message ConnectRpcMessage {
- ServerAddress serverAddress = 1;
+message SubscriptionCloseProto {
+ string sessionId = 1;
+ int32 subscriptionId = 2;
}
-message ToDeviceRpcRequestRpcMessage {
- Uid deviceTenantId = 2;
- Uid deviceId = 3;
-
- Uid msgId = 4;
- bool oneway = 5;
- int64 expTime = 6;
- string method = 7;
- string params = 8;
-}
-
-message ToPluginRpcResponseRpcMessage {
- Uid msgId = 2;
- string response = 3;
- string error = 4;
+message SubscriptionKetStateProto {
+ string key = 1;
+ int64 ts = 2;
}
-message ToRpcServerMessage {
- ConnectRpcMessage connectMsg = 1;
- ToPluginRpcMessage toPluginRpcMsg = 2;
- ToDeviceActorRpcMessage toDeviceActorRpcMsg = 3;
- ToDeviceSessionActorRpcMessage toDeviceSessionActorRpcMsg = 4;
- ToDeviceActorNotificationRpcMessage toDeviceActorNotificationRpcMsg = 5;
- ToAllNodesRpcMessage toAllNodesRpcMsg = 6;
- ToDeviceRpcRequestRpcMessage toDeviceRpcRequestRpcMsg = 7;
- ToPluginRpcResponseRpcMessage toPluginRpcResponseRpcMsg = 8;
+message SubscriptionUpdateValueListProto {
+ string key = 1;
+ repeated int64 ts = 2;
+ repeated string value = 3;
}
-service ClusterRpcService {
- rpc handlePluginMsgs(stream ToRpcServerMessage) returns (stream ToRpcServerMessage) {}
+message KeyValueProto {
+ string key = 1;
+ int64 ts = 2;
+ int32 valueType = 3;
+ string strValue = 4;
+ int64 longValue = 5;
+ double doubleValue = 6;
+ bool boolValue = 7;
}
diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml
index 1779912..978a570 100644
--- a/application/src/main/resources/logback.xml
+++ b/application/src/main/resources/logback.xml
@@ -25,7 +25,7 @@
</encoder>
</appender>
- <logger name="org.thingsboard.server" level="TRACE" />
+ <logger name="org.thingsboard.server" level="INFO" />
<logger name="akka" level="INFO" />
<root level="INFO">
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 63c5c44..8d37740 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -58,6 +58,8 @@ cluster:
hash_function_name: "${CLUSTER_HASH_FUNCTION_NAME:murmur3_128}"
# Amount of virtual nodes in consistent hash ring.
vitrual_nodes_size: "${CLUSTER_VIRTUAL_NODES_SIZE:16}"
+ # Queue partition id for current node
+ partition_id: "${QUEUE_PARTITION_ID:0}"
# Plugins configuration parameters
plugins:
@@ -106,7 +108,7 @@ mqtt:
# CoAP server parameters
coap:
# Enable/disable coap transport protocol.
- enabled: "${COAP_ENABLED:true}"
+ enabled: "${COAP_ENABLED:false}"
bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}"
bind_port: "${COAP_BIND_PORT:5683}"
adaptor: "${COAP_ADAPTOR_NAME:JsonCoapAdaptor}"
@@ -331,6 +333,7 @@ rule:
#Message queue cleanup period in seconds
cleanup_period: "${RULE_QUEUE_CLEANUP_PERIOD:3600}"
+
# PostgreSQL DAO Configuration
#spring:
# data:
diff --git a/application/src/test/java/org/thingsboard/server/mqtt/DbConfigurationTestRule.java b/application/src/test/java/org/thingsboard/server/mqtt/DbConfigurationTestRule.java
new file mode 100644
index 0000000..4b71cae
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/mqtt/DbConfigurationTestRule.java
@@ -0,0 +1,31 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.mqtt;
+
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+/**
+ * Created by ashvayka on 11.05.18.
+ */
+public class DbConfigurationTestRule implements TestRule {
+
+ @Override
+ public Statement apply(Statement base, Description description) {
+ return null;
+ }
+}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/id/IdBased.java b/common/data/src/main/java/org/thingsboard/server/common/data/id/IdBased.java
index 6ac5136..e832807 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/id/IdBased.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/id/IdBased.java
@@ -17,9 +17,10 @@ package org.thingsboard.server.common.data.id;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.io.Serializable;
import java.util.UUID;
-public abstract class IdBased<I extends UUIDBased> {
+public abstract class IdBased<I extends UUIDBased> implements Serializable {
protected I id;
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/SendToClusterMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/SendToClusterMsg.java
new file mode 100644
index 0000000..d3bc8e9
--- /dev/null
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/SendToClusterMsg.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.msg.cluster;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
+
+@Data
+public class SendToClusterMsg implements TbActorMsg {
+
+ private TbActorMsg msg;
+ private EntityId entityId;
+
+ public SendToClusterMsg(EntityId entityId, TbActorMsg msg) {
+ this.entityId = entityId;
+ this.msg = msg;
+ }
+
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.SEND_TO_CLUSTER_MSG;
+ }
+}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ToAllNodesMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ToAllNodesMsg.java
index 1dc33c0..877689d 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ToAllNodesMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ToAllNodesMsg.java
@@ -15,10 +15,12 @@
*/
package org.thingsboard.server.common.msg.cluster;
+import org.thingsboard.server.common.msg.TbActorMsg;
+
import java.io.Serializable;
/**
* @author Andrew Shvayka
*/
-public interface ToAllNodesMsg extends Serializable {
+public interface ToAllNodesMsg extends Serializable, TbActorMsg {
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java
index 1da19cb..92c5105 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java
@@ -24,7 +24,7 @@ import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
import org.thingsboard.server.common.msg.session.SessionType;
-import org.thingsboard.server.common.msg.session.ToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
import java.util.Optional;
@@ -45,7 +45,7 @@ public class BasicDeviceToDeviceActorMsg implements DeviceToDeviceActorMsg {
this(null, other.getTenantId(), other.getCustomerId(), other.getDeviceId(), other.getSessionId(), other.getSessionType(), msg);
}
- public BasicDeviceToDeviceActorMsg(ToDeviceActorSessionMsg msg, SessionType sessionType) {
+ public BasicDeviceToDeviceActorMsg(TransportToDeviceSessionActorMsg msg, SessionType sessionType) {
this(null, msg.getTenantId(), msg.getCustomerId(), msg.getDeviceId(), msg.getSessionId(), sessionType, msg.getSessionMsg().getMsg());
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index d63456e..7702788 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -29,6 +29,11 @@ public enum MsgType {
CLUSTER_EVENT_MSG,
/**
+ * All messages, could be send to cluster
+ */
+ SEND_TO_CLUSTER_MSG,
+
+ /**
* ADDED/UPDATED/DELETED events for main entities.
*
* See {@link org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg}
@@ -96,6 +101,6 @@ public enum MsgType {
/**
* Message that is sent from Rule Engine to the Device Actor when message is successfully pushed to queue.
*/
- RULE_ENGINE_QUEUE_PUT_ACK_MSG;
+ RULE_ENGINE_QUEUE_PUT_ACK_MSG, ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG, TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG, SESSION_TIMEOUT_MSG, SESSION_CTRL_MSG;
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
index c104281..2ee8432 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
@@ -20,7 +20,6 @@ import lombok.ToString;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
-import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
index a24bdcd..49fad13 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
@@ -16,6 +16,7 @@
package org.thingsboard.server.common.msg.session.ctrl;
import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
public class SessionCloseMsg implements SessionCtrlMsg {
@@ -60,4 +61,8 @@ public class SessionCloseMsg implements SessionCtrlMsg {
return timeout;
}
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.SESSION_CTRL_MSG;
+ }
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionCtrlMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionCtrlMsg.java
index 19ca219..8082f72 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionCtrlMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionCtrlMsg.java
@@ -15,8 +15,9 @@
*/
package org.thingsboard.server.common.msg.session;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
-public interface SessionCtrlMsg extends SessionAwareMsg {
+public interface SessionCtrlMsg extends SessionAwareMsg, TbActorMsg {
}
dao/pom.xml 12(+0 -12)
diff --git a/dao/pom.xml b/dao/pom.xml
index 1d2d962..75ca13c 100644
--- a/dao/pom.xml
+++ b/dao/pom.xml
@@ -153,22 +153,10 @@
<artifactId>curator-x-discovery</artifactId>
</dependency>
<dependency>
- <groupId>com.hazelcast</groupId>
- <artifactId>hazelcast-zookeeper</artifactId>
- </dependency>
- <dependency>
- <groupId>com.hazelcast</groupId>
- <artifactId>hazelcast</artifactId>
- </dependency>
- <dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
- <groupId>com.hazelcast</groupId>
- <artifactId>hazelcast-spring</artifactId>
- </dependency>
- <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java
index c0ca37a..f84cc1d 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java
@@ -222,15 +222,14 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
public ListenableFuture<AlarmInfo> findAlarmInfoByIdAsync(AlarmId alarmId) {
log.trace("Executing findAlarmInfoByIdAsync [{}]", alarmId);
validateId(alarmId, "Incorrect alarmId " + alarmId);
- return Futures.transform(alarmDao.findAlarmByIdAsync(alarmId.getId()),
- (AsyncFunction<Alarm, AlarmInfo>) alarm1 -> {
- AlarmInfo alarmInfo = new AlarmInfo(alarm1);
+ return Futures.transformAsync(alarmDao.findAlarmByIdAsync(alarmId.getId()),
+ a -> {
+ AlarmInfo alarmInfo = new AlarmInfo(a);
return Futures.transform(
- entityService.fetchEntityNameAsync(alarmInfo.getOriginator()), (Function<String, AlarmInfo>)
- originatorName -> {
- alarmInfo.setOriginatorName(originatorName);
- return alarmInfo;
- }
+ entityService.fetchEntityNameAsync(alarmInfo.getOriginator()), originatorName -> {
+ alarmInfo.setOriginatorName(originatorName);
+ return alarmInfo;
+ }
);
});
}
@@ -239,18 +238,17 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
public ListenableFuture<TimePageData<AlarmInfo>> findAlarms(AlarmQuery query) {
ListenableFuture<List<AlarmInfo>> alarms = alarmDao.findAlarms(query);
if (query.getFetchOriginator() != null && query.getFetchOriginator().booleanValue()) {
- alarms = Futures.transform(alarms, (AsyncFunction<List<AlarmInfo>, List<AlarmInfo>>) input -> {
+ alarms = Futures.transformAsync(alarms, input -> {
List<ListenableFuture<AlarmInfo>> alarmFutures = new ArrayList<>(input.size());
for (AlarmInfo alarmInfo : input) {
alarmFutures.add(Futures.transform(
- entityService.fetchEntityNameAsync(alarmInfo.getOriginator()), (Function<String, AlarmInfo>)
- originatorName -> {
- if (originatorName == null) {
- originatorName = "Deleted";
- }
- alarmInfo.setOriginatorName(originatorName);
- return alarmInfo;
- }
+ entityService.fetchEntityNameAsync(alarmInfo.getOriginator()), originatorName -> {
+ if (originatorName == null) {
+ originatorName = "Deleted";
+ }
+ alarmInfo.setOriginatorName(originatorName);
+ return alarmInfo;
+ }
));
}
return Futures.successfulAsList(alarmFutures);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/CassandraAlarmDao.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/CassandraAlarmDao.java
index 1233c7f..6785f2e 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/alarm/CassandraAlarmDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/CassandraAlarmDao.java
@@ -102,12 +102,12 @@ public class CassandraAlarmDao extends CassandraAbstractModelDao<AlarmEntity, Al
}
String relationType = BaseAlarmService.ALARM_RELATION_PREFIX + searchStatusName;
ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(affectedEntity, relationType, RelationTypeGroup.ALARM, EntityType.ALARM, query.getPageLink());
- return Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<AlarmInfo>>) input -> {
+ return Futures.transformAsync(relations, input -> {
List<ListenableFuture<AlarmInfo>> alarmFutures = new ArrayList<>(input.size());
for (EntityRelation relation : input) {
alarmFutures.add(Futures.transform(
findAlarmByIdAsync(relation.getTo().getId()),
- (Function<Alarm, AlarmInfo>) AlarmInfo::new));
+ AlarmInfo::new));
}
return Futures.successfulAsList(alarmFutures);
});
diff --git a/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java b/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java
index dcd9523..7bb67f0 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java
@@ -194,10 +194,10 @@ public class BaseAssetService extends AbstractEntityService implements AssetServ
@Override
public ListenableFuture<List<Asset>> findAssetsByQuery(AssetSearchQuery query) {
ListenableFuture<List<EntityRelation>> relations = relationService.findByQuery(query.toEntitySearchQuery());
- ListenableFuture<List<Asset>> assets = Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<Asset>>) relations1 -> {
+ ListenableFuture<List<Asset>> assets = Futures.transformAsync(relations, r -> {
EntitySearchDirection direction = query.toEntitySearchQuery().getParameters().getDirection();
List<ListenableFuture<Asset>> futures = new ArrayList<>();
- for (EntityRelation relation : relations1) {
+ for (EntityRelation relation : r) {
EntityId entityId = direction == EntitySearchDirection.FROM ? relation.getTo() : relation.getFrom();
if (entityId.getEntityType() == EntityType.ASSET) {
futures.add(findAssetByIdAsync(new AssetId(entityId.getId())));
diff --git a/dao/src/main/java/org/thingsboard/server/dao/cassandra/AbstractCassandraCluster.java b/dao/src/main/java/org/thingsboard/server/dao/cassandra/AbstractCassandraCluster.java
index ca94822..fd61976 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/cassandra/AbstractCassandraCluster.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/cassandra/AbstractCassandraCluster.java
@@ -16,10 +16,17 @@
package org.thingsboard.server.dao.cassandra;
-import com.datastax.driver.core.*;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.ProtocolOptions.Compression;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.mapping.DefaultPropertyMapper;
import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.MappingConfiguration;
import com.datastax.driver.mapping.MappingManager;
+import com.datastax.driver.mapping.PropertyAccessStrategy;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
@@ -149,7 +156,13 @@ public abstract class AbstractCassandraCluster {
} else {
session = cluster.connect();
}
- mappingManager = new MappingManager(session);
+// For Cassandra Driver version 3.5.0
+ DefaultPropertyMapper propertyMapper = new DefaultPropertyMapper();
+ propertyMapper.setPropertyAccessStrategy(PropertyAccessStrategy.FIELDS);
+ MappingConfiguration configuration = MappingConfiguration.builder().withPropertyMapper(propertyMapper).build();
+ mappingManager = new MappingManager(session, configuration);
+// For Cassandra Driver version 3.0.0
+// mappingManager = new MappingManager(session);
break;
} catch (Exception e) {
log.warn("Failed to initialize cassandra cluster due to {}. Will retry in {} ms", e.getMessage(), initRetryInterval);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/dashboard/CassandraDashboardInfoDao.java b/dao/src/main/java/org/thingsboard/server/dao/dashboard/CassandraDashboardInfoDao.java
index 8091b2a..70afed5 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/dashboard/CassandraDashboardInfoDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/dashboard/CassandraDashboardInfoDao.java
@@ -77,7 +77,7 @@ public class CassandraDashboardInfoDao extends CassandraAbstractSearchTextDao<Da
ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(new CustomerId(customerId), EntityRelation.CONTAINS_TYPE, RelationTypeGroup.DASHBOARD, EntityType.DASHBOARD, pageLink);
- return Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<DashboardInfo>>) input -> {
+ return Futures.transformAsync(relations, input -> {
List<ListenableFuture<DashboardInfo>> dashboardFutures = new ArrayList<>(input.size());
for (EntityRelation relation : input) {
dashboardFutures.add(findByIdAsync(relation.getTo().getId()));
diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
index 9120619..0d19ac1 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
@@ -227,10 +227,10 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
@Override
public ListenableFuture<List<Device>> findDevicesByQuery(DeviceSearchQuery query) {
ListenableFuture<List<EntityRelation>> relations = relationService.findByQuery(query.toEntitySearchQuery());
- ListenableFuture<List<Device>> devices = Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<Device>>) relations1 -> {
+ ListenableFuture<List<Device>> devices = Futures.transformAsync(relations, r -> {
EntitySearchDirection direction = query.toEntitySearchQuery().getParameters().getDirection();
List<ListenableFuture<Device>> futures = new ArrayList<>();
- for (EntityRelation relation : relations1) {
+ for (EntityRelation relation : r) {
EntityId entityId = direction == EntitySearchDirection.FROM ? relation.getTo() : relation.getFrom();
if (entityId.getEntityType() == EntityType.DEVICE) {
futures.add(findDeviceByIdAsync(new DeviceId(entityId.getId())));
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java
index d250563..d4aef86 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java
@@ -36,14 +36,14 @@ public class RateLimitedResultSetFuture implements ResultSetFuture {
private final ListenableFuture<Void> rateLimitFuture;
public RateLimitedResultSetFuture(Session session, AsyncRateLimiter rateLimiter, Statement statement) {
- this.rateLimitFuture = Futures.withFallback(rateLimiter.acquireAsync(), t -> {
+ this.rateLimitFuture = Futures.catchingAsync(rateLimiter.acquireAsync(), Throwable.class, t -> {
if (!(t instanceof BufferLimitException)) {
rateLimiter.release();
}
return Futures.immediateFailedFuture(t);
});
this.originalFuture = Futures.transform(rateLimitFuture,
- (Function<Void, ResultSetFuture>) i -> executeAsyncWithRelease(rateLimiter, session, statement));
+ i -> executeAsyncWithRelease(rateLimiter, session, statement));
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java
index a0cb1fb..7b2b391 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java
@@ -16,7 +16,6 @@
package org.thingsboard.server.dao.relation;
import com.google.common.base.Function;
-import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
@@ -29,12 +28,23 @@ import org.springframework.cache.annotation.Caching;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.id.EntityId;
-import org.thingsboard.server.common.data.relation.*;
+import org.thingsboard.server.common.data.relation.EntityRelation;
+import org.thingsboard.server.common.data.relation.EntityRelationInfo;
+import org.thingsboard.server.common.data.relation.EntityRelationsQuery;
+import org.thingsboard.server.common.data.relation.EntitySearchDirection;
+import org.thingsboard.server.common.data.relation.EntityTypeFilter;
+import org.thingsboard.server.common.data.relation.RelationTypeGroup;
+import org.thingsboard.server.common.data.relation.RelationsSearchParameters;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.dao.exception.DataValidationException;
import javax.annotation.Nullable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
@@ -175,7 +185,7 @@ public class BaseRelationService implements RelationService {
inboundRelationsList.add(relationDao.findAllByTo(entity, typeGroup));
}
ListenableFuture<List<List<EntityRelation>>> inboundRelations = Futures.allAsList(inboundRelationsList);
- ListenableFuture<List<Boolean>> inboundDeletions = Futures.transform(inboundRelations, (List<List<EntityRelation>> relations) ->
+ ListenableFuture<List<Boolean>> inboundDeletions = Futures.transform(inboundRelations, relations ->
getBooleans(relations, cache, true));
ListenableFuture<Boolean> inboundFuture = Futures.transform(inboundDeletions, getListToBooleanFunction());
@@ -191,8 +201,7 @@ public class BaseRelationService implements RelationService {
outboundRelationsList.add(relationDao.findAllByFrom(entity, typeGroup));
}
ListenableFuture<List<List<EntityRelation>>> outboundRelations = Futures.allAsList(outboundRelationsList);
- Futures.transform(outboundRelations, (Function<List<List<EntityRelation>>, List<Boolean>>) relations ->
- getBooleans(relations, cache, false));
+ Futures.transform(outboundRelations, relations -> getBooleans(relations, cache, false));
boolean outboundDeleteResult = relationDao.deleteOutboundRelations(entity);
return inboundDeleteResult && outboundDeleteResult;
@@ -201,7 +210,7 @@ public class BaseRelationService implements RelationService {
private List<Boolean> getBooleans(List<List<EntityRelation>> relations, Cache cache, boolean isRemove) {
List<Boolean> results = new ArrayList<>();
for (List<EntityRelation> relationList : relations) {
- relationList.stream().forEach(relation -> checkFromDeleteSync(cache, results, relation, isRemove));
+ relationList.forEach(relation -> checkFromDeleteSync(cache, results, relation, isRemove));
}
return results;
}
@@ -223,8 +232,8 @@ public class BaseRelationService implements RelationService {
inboundRelationsList.add(relationDao.findAllByTo(entity, typeGroup));
}
ListenableFuture<List<List<EntityRelation>>> inboundRelations = Futures.allAsList(inboundRelationsList);
- ListenableFuture<List<Boolean>> inboundDeletions = Futures.transform(inboundRelations,
- (AsyncFunction<List<List<EntityRelation>>, List<Boolean>>) relations -> {
+ ListenableFuture<List<Boolean>> inboundDeletions = Futures.transformAsync(inboundRelations,
+ relations -> {
List<ListenableFuture<Boolean>> results = getListenableFutures(relations, cache, true);
return Futures.allAsList(results);
});
@@ -236,7 +245,7 @@ public class BaseRelationService implements RelationService {
outboundRelationsList.add(relationDao.findAllByFrom(entity, typeGroup));
}
ListenableFuture<List<List<EntityRelation>>> outboundRelations = Futures.allAsList(outboundRelationsList);
- Futures.transform(outboundRelations, (AsyncFunction<List<List<EntityRelation>>, List<Boolean>>) relations -> {
+ Futures.transformAsync(outboundRelations, relations -> {
List<ListenableFuture<Boolean>> results = getListenableFutures(relations, cache, false);
return Futures.allAsList(results);
});
@@ -248,7 +257,7 @@ public class BaseRelationService implements RelationService {
private List<ListenableFuture<Boolean>> getListenableFutures(List<List<EntityRelation>> relations, Cache cache, boolean isRemove) {
List<ListenableFuture<Boolean>> results = new ArrayList<>();
for (List<EntityRelation> relationList : relations) {
- relationList.stream().forEach(relation -> checkFromDeleteAsync(cache, results, relation, isRemove));
+ relationList.forEach(relation -> checkFromDeleteAsync(cache, results, relation, isRemove));
}
return results;
}
@@ -315,17 +324,16 @@ public class BaseRelationService implements RelationService {
validate(from);
validateTypeGroup(typeGroup);
ListenableFuture<List<EntityRelation>> relations = relationDao.findAllByFrom(from, typeGroup);
- ListenableFuture<List<EntityRelationInfo>> relationsInfo = Futures.transform(relations,
- (AsyncFunction<List<EntityRelation>, List<EntityRelationInfo>>) relations1 -> {
+ return Futures.transformAsync(relations,
+ relations1 -> {
List<ListenableFuture<EntityRelationInfo>> futures = new ArrayList<>();
- relations1.stream().forEach(relation ->
+ relations1.forEach(relation ->
futures.add(fetchRelationInfoAsync(relation,
- relation2 -> relation2.getTo(),
- (EntityRelationInfo relationInfo, String entityName) -> relationInfo.setToName(entityName)))
+ EntityRelation::getTo,
+ EntityRelationInfo::setToName))
);
return Futures.successfulAsList(futures);
});
- return relationsInfo;
}
@Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup}")
@@ -371,30 +379,27 @@ public class BaseRelationService implements RelationService {
validate(to);
validateTypeGroup(typeGroup);
ListenableFuture<List<EntityRelation>> relations = relationDao.findAllByTo(to, typeGroup);
- ListenableFuture<List<EntityRelationInfo>> relationsInfo = Futures.transform(relations,
- (AsyncFunction<List<EntityRelation>, List<EntityRelationInfo>>) relations1 -> {
+ return Futures.transformAsync(relations,
+ relations1 -> {
List<ListenableFuture<EntityRelationInfo>> futures = new ArrayList<>();
- relations1.stream().forEach(relation ->
+ relations1.forEach(relation ->
futures.add(fetchRelationInfoAsync(relation,
- relation2 -> relation2.getFrom(),
- (EntityRelationInfo relationInfo, String entityName) -> relationInfo.setFromName(entityName)))
+ EntityRelation::getFrom,
+ EntityRelationInfo::setFromName))
);
return Futures.successfulAsList(futures);
});
- return relationsInfo;
}
private ListenableFuture<EntityRelationInfo> fetchRelationInfoAsync(EntityRelation relation,
Function<EntityRelation, EntityId> entityIdGetter,
BiConsumer<EntityRelationInfo, String> entityNameSetter) {
ListenableFuture<String> entityName = entityService.fetchEntityNameAsync(entityIdGetter.apply(relation));
- ListenableFuture<EntityRelationInfo> entityRelationInfo =
- Futures.transform(entityName, (Function<String, EntityRelationInfo>) entityName1 -> {
- EntityRelationInfo entityRelationInfo1 = new EntityRelationInfo(relation);
- entityNameSetter.accept(entityRelationInfo1, entityName1);
- return entityRelationInfo1;
- });
- return entityRelationInfo;
+ return Futures.transform(entityName, entityName1 -> {
+ EntityRelationInfo entityRelationInfo1 = new EntityRelationInfo(relation);
+ entityNameSetter.accept(entityRelationInfo1, entityName1);
+ return entityRelationInfo1;
+ });
}
@Cacheable(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType, #typeGroup}")
@@ -429,7 +434,7 @@ public class BaseRelationService implements RelationService {
try {
ListenableFuture<Set<EntityRelation>> relationSet = findRelationsRecursively(params.getEntityId(), params.getDirection(), maxLvl, new ConcurrentHashMap<>());
- return Futures.transform(relationSet, (Function<Set<EntityRelation>, List<EntityRelation>>) input -> {
+ return Futures.transform(relationSet, input -> {
List<EntityRelation> relations = new ArrayList<>();
if (filters == null || filters.isEmpty()) {
relations.addAll(input);
@@ -453,10 +458,10 @@ public class BaseRelationService implements RelationService {
log.trace("Executing findInfoByQuery [{}]", query);
ListenableFuture<List<EntityRelation>> relations = findByQuery(query);
EntitySearchDirection direction = query.getParameters().getDirection();
- ListenableFuture<List<EntityRelationInfo>> relationsInfo = Futures.transform(relations,
- (AsyncFunction<List<EntityRelation>, List<EntityRelationInfo>>) relations1 -> {
+ return Futures.transformAsync(relations,
+ relations1 -> {
List<ListenableFuture<EntityRelationInfo>> futures = new ArrayList<>();
- relations1.stream().forEach(relation ->
+ relations1.forEach(relation ->
futures.add(fetchRelationInfoAsync(relation,
relation2 -> direction == EntitySearchDirection.FROM ? relation2.getTo() : relation2.getFrom(),
(EntityRelationInfo relationInfo, String entityName) -> {
@@ -469,7 +474,6 @@ public class BaseRelationService implements RelationService {
);
return Futures.successfulAsList(futures);
});
- return relationsInfo;
}
protected void validate(EntityRelation relation) {
@@ -575,7 +579,7 @@ public class BaseRelationService implements RelationService {
}
//TODO: try to remove this blocking operation
List<Set<EntityRelation>> relations = Futures.successfulAsList(futures).get();
- relations.forEach(r -> r.forEach(d -> children.add(d)));
+ relations.forEach(r -> r.forEach(children::add));
return Futures.immediateFuture(children);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java
index 0d64d5c..e092a47 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java
@@ -102,12 +102,12 @@ public class JpaAlarmDao extends JpaAbstractDao<AlarmEntity, Alarm> implements A
}
String relationType = BaseAlarmService.ALARM_RELATION_PREFIX + searchStatusName;
ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(affectedEntity, relationType, RelationTypeGroup.ALARM, EntityType.ALARM, query.getPageLink());
- return Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<AlarmInfo>>) input -> {
+ return Futures.transformAsync(relations, input -> {
List<ListenableFuture<AlarmInfo>> alarmFutures = new ArrayList<>(input.size());
for (EntityRelation relation : input) {
alarmFutures.add(Futures.transform(
findAlarmByIdAsync(relation.getTo().getId()),
- (Function<Alarm, AlarmInfo>) AlarmInfo::new));
+ AlarmInfo::new));
}
return Futures.successfulAsList(alarmFutures);
});
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/dashboard/JpaDashboardInfoDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/dashboard/JpaDashboardInfoDao.java
index 4d8d0b2..cc64e80 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/dashboard/JpaDashboardInfoDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/dashboard/JpaDashboardInfoDao.java
@@ -86,7 +86,7 @@ public class JpaDashboardInfoDao extends JpaAbstractSearchTextDao<DashboardInfoE
ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(new CustomerId(customerId), EntityRelation.CONTAINS_TYPE, RelationTypeGroup.DASHBOARD, EntityType.DASHBOARD, pageLink);
- return Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<DashboardInfo>>) input -> {
+ return Futures.transformAsync(relations, input -> {
List<ListenableFuture<DashboardInfo>> dashboardFutures = new ArrayList<>(input.size());
for (EntityRelation relation : input) {
dashboardFutures.add(findByIdAsync(relation.getTo().getId()));
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
index cda4b16..eb1ef52 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
@@ -217,7 +217,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
- ListenableFuture<List<ResultSet>> aggregationChunks = Futures.transform(partitionsListFuture,
+ ListenableFuture<List<ResultSet>> aggregationChunks = Futures.transformAsync(partitionsListFuture,
getFetchChunksAsyncFunction(entityId, key, aggregation, startTs, endTs), readResultsProcessingExecutor);
return Futures.transform(aggregationChunks, new AggregatePartitionsFunction(aggregation, key, ts), readResultsProcessingExecutor);
diff --git a/dao/src/main/resources/cassandra/schema.cql b/dao/src/main/resources/cassandra/schema.cql
index 9150587..59dda63 100644
--- a/dao/src/main/resources/cassandra/schema.cql
+++ b/dao/src/main/resources/cassandra/schema.cql
@@ -692,4 +692,4 @@ CREATE TABLE IF NOT EXISTS thingsboard.rule_node (
configuration text,
additional_info text,
PRIMARY KEY (id)
-);
\ No newline at end of file
+);
diff --git a/dao/src/test/java/org/thingsboard/server/dao/CustomCassandraCQLUnit.java b/dao/src/test/java/org/thingsboard/server/dao/CustomCassandraCQLUnit.java
index d214a70..fbed694 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/CustomCassandraCQLUnit.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/CustomCassandraCQLUnit.java
@@ -26,7 +26,7 @@ import com.datastax.driver.core.Session;
import java.util.List;
public class CustomCassandraCQLUnit extends BaseCassandraUnit {
- private List<CQLDataSet> dataSets;
+ protected List<CQLDataSet> dataSets;
public Session session;
public Cluster cluster;
diff --git a/dao/src/test/resources/cassandra-test.yaml b/dao/src/test/resources/cassandra-test.yaml
index 6463f64..e60f248 100644
--- a/dao/src/test/resources/cassandra-test.yaml
+++ b/dao/src/test/resources/cassandra-test.yaml
@@ -103,6 +103,8 @@ commitlog_directory: target/embeddedCassandra/commitlog
hints_directory: target/embeddedCassandra/hints
+cdc_raw_directory: target/embeddedCassandra/cdc
+
# policy for data disk failures:
# stop: shut down gossip and Thrift, leaving the node effectively dead, but
# can still be inspected via JMX.
docker/cluster-mode-thirdparty.yml 45(+45 -0)
diff --git a/docker/cluster-mode-thirdparty.yml b/docker/cluster-mode-thirdparty.yml
new file mode 100644
index 0000000..3aa5bb5
--- /dev/null
+++ b/docker/cluster-mode-thirdparty.yml
@@ -0,0 +1,45 @@
+#
+# Copyright © 2016-2018 The Thingsboard Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+version: '3.3'
+services:
+ zookeeper:
+ image: wurstmeister/zookeeper
+ networks:
+ - core
+ ports:
+ - "2181:2181"
+
+ cassandra:
+ image: cassandra:3.11.2
+ networks:
+ - core
+ ports:
+ - "7199:7199"
+ - "9160:9160"
+ - "9042:9042"
+
+ redis:
+ image: redis:4.0
+ networks:
+ - core
+ command: redis-server --maxclients 2000
+ ports:
+ - "6379:6379"
+
+networks:
+ core:
+
pom.xml 27(+5 -22)
diff --git a/pom.xml b/pom.xml
index 297512f..dfccb7d 100755
--- a/pom.xml
+++ b/pom.xml
@@ -41,10 +41,10 @@
<logback.version>1.2.3</logback.version>
<mockito.version>1.9.5</mockito.version>
<rat.version>0.10</rat.version>
- <cassandra.version>3.0.7</cassandra.version>
- <cassandra-unit.version>3.0.0.1</cassandra-unit.version>
+ <cassandra.version>3.5.0</cassandra.version>
+ <cassandra-unit.version>3.3.0.2</cassandra-unit.version>
<takari-cpsuite.version>1.2.7</takari-cpsuite.version>
- <guava.version>18.0</guava.version>
+ <guava.version>21.0</guava.version>
<caffeine.version>2.6.1</caffeine.version>
<commons-lang3.version>3.4</commons-lang3.version>
<commons-validator.version>1.5.0</commons-validator.version>
@@ -59,17 +59,15 @@
<velocity.version>1.7</velocity.version>
<velocity-tools.version>2.0</velocity-tools.version>
<mail.version>1.4.3</mail.version>
- <curator.version>2.11.0</curator.version>
+ <curator.version>4.0.1</curator.version>
<protobuf.version>3.0.2</protobuf.version>
- <grpc.version>1.0.0</grpc.version>
+ <grpc.version>1.12.0</grpc.version>
<lombok.version>1.16.18</lombok.version>
<paho.client.version>1.1.0</paho.client.version>
<netty.version>4.1.22.Final</netty.version>
<os-maven-plugin.version>1.5.0</os-maven-plugin.version>
<rabbitmq.version>3.6.5</rabbitmq.version>
<kafka.version>0.9.0.0</kafka.version>
- <hazelcast.version>3.6.6</hazelcast.version>
- <hazelcast-zookeeper.version>3.6.1</hazelcast-zookeeper.version>
<surfire.version>2.19.1</surfire.version>
<jar-plugin.version>3.0.2</jar-plugin.version>
<springfox-swagger.version>2.6.1</springfox-swagger.version>
@@ -761,26 +759,11 @@
<version>${paho.client.version}</version>
</dependency>
<dependency>
- <groupId>com.hazelcast</groupId>
- <artifactId>hazelcast-spring</artifactId>
- <version>${hazelcast.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>${curator.version}</version>
</dependency>
<dependency>
- <groupId>com.hazelcast</groupId>
- <artifactId>hazelcast-zookeeper</artifactId>
- <version>${hazelcast-zookeeper.version}</version>
- </dependency>
- <dependency>
- <groupId>com.hazelcast</groupId>
- <artifactId>hazelcast</artifactId>
- <version>${hazelcast.version}</version>
- </dependency>
- <dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>${springfox-swagger.version}</version>
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbClearAlarmNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbClearAlarmNode.java
index d223f4d..ed54c62 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbClearAlarmNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbClearAlarmNode.java
@@ -56,7 +56,7 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode<TbClearAlarmNodeConfig
@Override
protected ListenableFuture<AlarmResult> processAlarm(TbContext ctx, TbMsg msg) {
ListenableFuture<Alarm> latest = ctx.getAlarmService().findLatestByOriginatorAndType(ctx.getTenantId(), msg.getOriginator(), config.getAlarmType());
- return Futures.transform(latest, (AsyncFunction<Alarm, AlarmResult>) a -> {
+ return Futures.transformAsync(latest, a -> {
if (a != null && !a.getStatus().isCleared()) {
return clearAlarm(ctx, msg, a);
}
@@ -66,9 +66,9 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode<TbClearAlarmNodeConfig
private ListenableFuture<AlarmResult> clearAlarm(TbContext ctx, TbMsg msg, Alarm alarm) {
ListenableFuture<JsonNode> asyncDetails = buildAlarmDetails(ctx, msg, alarm.getDetails());
- return Futures.transform(asyncDetails, (AsyncFunction<JsonNode, AlarmResult>) details -> {
+ return Futures.transformAsync(asyncDetails, details -> {
ListenableFuture<Boolean> clearFuture = ctx.getAlarmService().clearAlarm(alarm.getId(), details, System.currentTimeMillis());
- return Futures.transform(clearFuture, (AsyncFunction<Boolean, AlarmResult>) cleared -> {
+ return Futures.transformAsync(clearFuture, cleared -> {
if (cleared && details != null) {
alarm.setDetails(details);
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java
index 5c2109b..dcf9068 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java
@@ -58,7 +58,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
@Override
protected ListenableFuture<AlarmResult> processAlarm(TbContext ctx, TbMsg msg) {
ListenableFuture<Alarm> latest = ctx.getAlarmService().findLatestByOriginatorAndType(ctx.getTenantId(), msg.getOriginator(), config.getAlarmType());
- return Futures.transform(latest, (AsyncFunction<Alarm, AlarmResult>) a -> {
+ return Futures.transformAsync(latest, a -> {
if (a == null || a.getStatus().isCleared()) {
return createNewAlarm(ctx, msg);
} else {
@@ -70,10 +70,10 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
private ListenableFuture<AlarmResult> createNewAlarm(TbContext ctx, TbMsg msg) {
ListenableFuture<Alarm> asyncAlarm = Futures.transform(buildAlarmDetails(ctx, msg, null),
- (Function<JsonNode, Alarm>) details -> buildAlarm(msg, details, ctx.getTenantId()));
+ details -> buildAlarm(msg, details, ctx.getTenantId()));
ListenableFuture<Alarm> asyncCreated = Futures.transform(asyncAlarm,
- (Function<Alarm, Alarm>) alarm -> ctx.getAlarmService().createOrUpdateAlarm(alarm), ctx.getDbCallbackExecutor());
- return Futures.transform(asyncCreated, (Function<Alarm, AlarmResult>) alarm -> new AlarmResult(true, false, false, alarm));
+ alarm -> ctx.getAlarmService().createOrUpdateAlarm(alarm), ctx.getDbCallbackExecutor());
+ return Futures.transform(asyncCreated, alarm -> new AlarmResult(true, false, false, alarm));
}
private ListenableFuture<AlarmResult> updateAlarm(TbContext ctx, TbMsg msg, Alarm alarm) {
@@ -85,7 +85,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
return ctx.getAlarmService().createOrUpdateAlarm(alarm);
}, ctx.getDbCallbackExecutor());
- return Futures.transform(asyncUpdated, (Function<Alarm, AlarmResult>) a -> new AlarmResult(false, true, false, a));
+ return Futures.transform(asyncUpdated, a -> new AlarmResult(false, true, false, a));
}
private Alarm buildAlarm(TbMsg msg, JsonNode details, TenantId tenantId) {
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java
index 73e1945..be72833 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java
@@ -43,8 +43,7 @@ public class EntitiesCustomerIdAsyncLoader {
}
private static <T extends HasCustomerId> ListenableFuture<CustomerId> getCustomerAsync(ListenableFuture<T> future) {
- return Futures.transform(future, (AsyncFunction<HasCustomerId, CustomerId>) in -> {
- return in != null ? Futures.immediateFuture(in.getCustomerId())
- : Futures.immediateFuture(null);});
+ return Futures.transformAsync(future, in -> in != null ? Futures.immediateFuture(in.getCustomerId())
+ : Futures.immediateFuture(null));
}
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedDeviceIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedDeviceIdAsyncLoader.java
index 8a09504..9e3a639 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedDeviceIdAsyncLoader.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedDeviceIdAsyncLoader.java
@@ -39,9 +39,8 @@ public class EntitiesRelatedDeviceIdAsyncLoader {
ListenableFuture<List<Device>> asyncDevices = deviceService.findDevicesByQuery(query);
- return Futures.transform(asyncDevices, (AsyncFunction<List<Device>, DeviceId>)
- d -> CollectionUtils.isNotEmpty(d) ? Futures.immediateFuture(d.get(0).getId())
- : Futures.immediateFuture(null));
+ return Futures.transformAsync(asyncDevices, d -> CollectionUtils.isNotEmpty(d) ? Futures.immediateFuture(d.get(0).getId())
+ : Futures.immediateFuture(null));
}
private static DeviceSearchQuery buildQuery(EntityId originator, DeviceRelationsQuery deviceRelationsQuery) {
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java
index 55be558..f4de8fc 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java
@@ -38,13 +38,11 @@ public class EntitiesRelatedEntityIdAsyncLoader {
EntityRelationsQuery query = buildQuery(originator, relationsQuery);
ListenableFuture<List<EntityRelation>> asyncRelation = relationService.findByQuery(query);
if (relationsQuery.getDirection() == EntitySearchDirection.FROM) {
- return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
- r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo())
- : Futures.immediateFuture(null));
+ return Futures.transformAsync(asyncRelation, r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo())
+ : Futures.immediateFuture(null));
} else if (relationsQuery.getDirection() == EntitySearchDirection.TO) {
- return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
- r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom())
- : Futures.immediateFuture(null));
+ return Futures.transformAsync(asyncRelation, r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom())
+ : Futures.immediateFuture(null));
}
return Futures.immediateFailedFuture(new IllegalStateException("Unknown direction"));
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java
index 3d5c64e..a681d68 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java
@@ -51,7 +51,7 @@ public class EntitiesTenantIdAsyncLoader {
}
private static <T extends HasTenantId> ListenableFuture<TenantId> getTenantAsync(ListenableFuture<T> future) {
- return Futures.transform(future, (AsyncFunction<HasTenantId, TenantId>) in -> {
+ return Futures.transformAsync(future, in -> {
return in != null ? Futures.immediateFuture(in.getTenantId())
: Futures.immediateFuture(null);});
}
diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
index 7674549..bac68ff 100644
--- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
+++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
@@ -38,8 +38,6 @@ import org.thingsboard.server.common.transport.quota.QuotaService;
import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
import org.thingsboard.server.transport.coap.session.CoapExchangeObserverProxy;
import org.thingsboard.server.transport.coap.session.CoapSessionCtx;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.util.ReflectionUtils;
@Slf4j
@@ -186,7 +184,7 @@ public class CoapTransportResource extends CoapResource {
throw new IllegalArgumentException("Unsupported msg type: " + type);
}
log.trace("Processing msg: {}", msg);
- processor.process(new BasicToDeviceActorSessionMsg(ctx.getDevice(), msg));
+ processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));
} catch (AdaptorException e) {
log.debug("Failed to decode payload {}", e);
exchange.respond(ResponseCode.BAD_REQUEST, e.getMessage());
diff --git a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
index 60b2220..320f06e 100644
--- a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
+++ b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
@@ -108,8 +108,8 @@ public class CoapServerTest {
@Override
public void process(SessionAwareMsg toActorMsg) {
- if (toActorMsg instanceof ToDeviceActorSessionMsg) {
- AdaptorToSessionActorMsg sessionMsg = ((ToDeviceActorSessionMsg) toActorMsg).getSessionMsg();
+ if (toActorMsg instanceof TransportToDeviceSessionActorMsg) {
+ AdaptorToSessionActorMsg sessionMsg = ((TransportToDeviceSessionActorMsg) toActorMsg).getSessionMsg();
try {
FromDeviceMsg deviceMsg = sessionMsg.getMsg();
ToDeviceMsg toDeviceMsg = null;
diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
index 4ac9799..d26d076 100644
--- a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
+++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
@@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
import org.thingsboard.server.common.msg.core.*;
import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
-import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
@@ -220,7 +220,7 @@ public class DeviceApiController {
private void process(HttpSessionCtx ctx, FromDeviceMsg request) {
AdaptorToSessionActorMsg msg = new BasicAdaptorToSessionActorMsg(ctx, request);
- processor.process(new BasicToDeviceActorSessionMsg(ctx.getDevice(), msg));
+ processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));
}
private boolean quotaExceeded(HttpServletRequest request, DeferredResult<ResponseEntity> responseWriter) {
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 5ccce35..0b38817 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
import org.thingsboard.server.common.data.security.DeviceX509Credentials;
import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
-import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
@@ -207,7 +207,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
}
if (msg != null) {
- processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+ processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
} else {
log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
ctx.close();
@@ -227,11 +227,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
try {
if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
- processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+ processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
grantedQoSList.add(getMinSupportedQos(reqQoS));
} else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) {
AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
- processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+ processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
grantedQoSList.add(getMinSupportedQos(reqQoS));
} else if (topicName.equals(DEVICE_RPC_RESPONSE_SUB_TOPIC)) {
grantedQoSList.add(getMinSupportedQos(reqQoS));
@@ -261,10 +261,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
try {
if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
- processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+ processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
} else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) {
AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
- processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+ processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
} else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) {
deviceSessionCtx.setDisallowAttributeResponses();
}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
index 2056452..f666bb8 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
@@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.msg.core.*;
import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
-import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
@@ -96,8 +96,8 @@ public class GatewaySessionCtx {
GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device);
devices.put(deviceName, ctx);
log.debug("[{}] Added device [{}] to the gateway session", gatewaySessionId, deviceName);
- processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg())));
- processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg())));
+ processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg())));
+ processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg())));
}
}
@@ -136,7 +136,7 @@ public class GatewaySessionCtx {
JsonConverter.parseWithTs(request, element.getAsJsonObject());
}
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
- processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
+ processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
}
} else {
@@ -152,7 +152,7 @@ public class GatewaySessionCtx {
Integer requestId = jsonObj.get("id").getAsInt();
String data = jsonObj.get("data").toString();
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
- processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
+ processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new ToDeviceRpcResponseMsg(requestId, data))));
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
@@ -174,7 +174,7 @@ public class GatewaySessionCtx {
JsonObject deviceData = deviceEntry.getValue().getAsJsonObject();
request.add(JsonConverter.parseValues(deviceData).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList()));
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
- processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
+ processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
}
} else {
@@ -207,7 +207,7 @@ public class GatewaySessionCtx {
request = new BasicGetAttributesRequest(requestId, null, keys);
}
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
- processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
+ processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
ack(msg);
} else {