thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 4(+3 -1)
application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java 3(+2 -1)
application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java 7(+4 -3)
application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionCreateRequestMsg.java 2(+1 -1)
application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java 115(+55 -60)
application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java 14(+6 -8)
application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java 13(+8 -5)
application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java 18(+10 -8)
application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java 162(+23 -139)
application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java 17(+4 -13)
application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSessionListener.java 15(+1 -14)
application/src/main/java/org/thingsboard/server/service/encoding/DataDecodingEncodingService.java 34(+34 -0)
application/src/main/java/org/thingsboard/server/service/encoding/ProtoWithJavaSerializationDecodingEncodingService.java 67(+67 -0)
application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java 18(+7 -11)
application/src/main/proto/cluster.proto 134(+64 -70)
common/message/src/main/java/org/thingsboard/server/common/msg/cluster/SendToClusterMsg.java 40(+40 -0)
common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicToDeviceSessionActorMsg.java 5(+5 -0)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 54737f8..a1e6f06 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -60,6 +60,7 @@ import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.component.ComponentDiscoveryService;
+import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.executors.ExternalCallExecutorService;
import org.thingsboard.server.service.mail.MailExecutorService;
@@ -103,6 +104,10 @@ public class ActorSystemContext {
@Autowired
@Getter
+ private DataDecodingEncodingService encodingService;
+
+ @Autowired
+ @Getter
private DeviceAuthService deviceAuthService;
@Autowired
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index 4d9b8b4..73cf788 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -34,6 +34,8 @@ import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
@@ -45,6 +47,7 @@ import scala.concurrent.duration.Duration;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
public class AppActor extends RuleChainManagerActor {
@@ -89,6 +92,9 @@ public class AppActor extends RuleChainManagerActor {
@Override
protected boolean process(TbActorMsg msg) {
switch (msg.getMsgType()) {
+ case SEND_TO_CLUSTER_MSG:
+ onPossibleClusterMsg((SendToClusterMsg) msg);
+ break;
case CLUSTER_EVENT_MSG:
broadcast(msg);
break;
@@ -112,6 +118,16 @@ public class AppActor extends RuleChainManagerActor {
return true;
}
+ private void onPossibleClusterMsg(SendToClusterMsg msg) {
+ Optional<ServerAddress> address = systemContext.getRoutingService().resolveById(msg.getEntityId());
+ if (address.isPresent()) {
+ systemContext.getRpcService().tell(
+ systemContext.getEncodingService().convertToProtoDataMessage(address.get(), msg.getMsg()));
+ } else {
+ self().tell(msg.getMsg(), ActorRef.noSender());
+ }
+ }
+
private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
if (SYSTEM_TENANT.equals(msg.getTenantId())) {
//TODO: ashvayka handle this.
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 5112f22..12164e6 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -74,6 +74,7 @@ import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotific
import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
@@ -521,7 +522,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (sessionAddress.isPresent()) {
ServerAddress address = sessionAddress.get();
logger.debug("{} Forwarding msg: {}", address, response);
- systemContext.getRpcService().tell(sessionAddress.get(), response);
+ systemContext.getRpcService().tell(systemContext.getEncodingService()
+ .convertToProtoDataMessage(sessionAddress.get(), response));
} else {
systemContext.getSessionManagerActor().tell(response, ActorRef.noSender());
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
index 5e16aab..f6b30bf 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
@@ -82,7 +82,8 @@ public final class PluginProcessingContext implements PluginContext {
@Override
public void sendPluginRpcMsg(RpcMsg msg) {
- this.pluginCtx.rpcService.tell(new PluginRpcMsg(pluginCtx.tenantId, pluginCtx.pluginId, msg));
+ //ToDO is this a cluster messsage?
+// this.pluginCtx.rpcService.tell(new PluginRpcMsg(pluginCtx.tenantId, pluginCtx.pluginId, msg));
}
@Override
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java
index dcabedc..c43d62d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java
@@ -21,6 +21,7 @@ import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.PluginId;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.common.msg.timeout.TimeoutMsg;
@@ -100,7 +101,7 @@ public final class SharedPluginProcessingContext {
}
public void toDeviceActor(DeviceAttributesEventNotificationMsg msg) {
- forward(msg.getDeviceId(), msg, rpcService::tell);
+ forward(msg.getDeviceId(), msg);
}
public void sendRpcRequest(ToDeviceRpcRequest msg) {
@@ -109,11 +110,11 @@ public final class SharedPluginProcessingContext {
// forward(msg.getDeviceId(), rpcMsg, rpcService::tell);
}
- private <T> void forward(DeviceId deviceId, T msg, BiConsumer<ServerAddress, T> rpcFunction) {
+ private <T extends TbActorMsg> void forward(DeviceId deviceId, T msg) {
Optional<ServerAddress> instance = routingService.resolveById(deviceId);
if (instance.isPresent()) {
log.trace("[{}] Forwarding msg {} to remote device actor!", pluginId, msg);
- rpcFunction.accept(instance.get(), msg);
+ rpcService.tell(systemContext.getEncodingService().convertToProtoDataMessage(instance.get(), msg));
} else {
log.trace("[{}] Forwarding msg {} to local device actor!", pluginId, msg);
parentActor.tell(msg, ActorRef.noSender());
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
index bc36dc8..d5c9dab 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
@@ -17,30 +17,11 @@ package org.thingsboard.server.actors.rpc;
import akka.actor.ActorRef;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.util.SerializationUtils;
-import org.springframework.util.StringUtils;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ActorService;
-import org.thingsboard.server.common.data.id.DeviceId;
-import org.thingsboard.server.common.data.id.PluginId;
-import org.thingsboard.server.common.data.id.TenantId;
-import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
-import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
-import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
-import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.*;
-import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
-import org.thingsboard.server.extensions.api.plugins.rpc.RpcMsg;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.rpc.GrpcSession;
import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
-import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
-
-import java.io.Serializable;
-import java.util.UUID;
/**
* @author Andrew Shvayka
@@ -76,48 +57,13 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
}
@Override
- public void onToPluginRpcMsg(GrpcSession session, ClusterAPIProtos.ToPluginRpcMessage msg) {
- if (log.isTraceEnabled()) {
- log.trace("{} session [{}] received plugin msg {}", getType(session), session.getRemoteServer(), msg);
- }
- service.onMsg(convert(session.getRemoteServer(), msg));
- }
-
- @Override
- public void onToDeviceActorRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceActorRpcMessage msg) {
- log.trace("{} session [{}] received device actor msg {}", getType(session), session.getRemoteServer(), msg);
- service.onMsg((DeviceToDeviceActorMsg) deserialize(msg.getData().toByteArray()));
- }
-
- @Override
- public void onToDeviceActorNotificationRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceActorNotificationRpcMessage msg) {
- log.trace("{} session [{}] received device actor notification msg {}", getType(session), session.getRemoteServer(), msg);
- service.onMsg((ToDeviceActorNotificationMsg) deserialize(msg.getData().toByteArray()));
- }
-
- @Override
- public void onToDeviceSessionActorRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceSessionActorRpcMessage msg) {
- log.trace(SESSION_RECEIVED_SESSION_ACTOR_MSG, getType(session), session.getRemoteServer(), msg);
- service.onMsg((ToDeviceSessionActorMsg) deserialize(msg.getData().toByteArray()));
- }
-
- @Override
- public void onToDeviceRpcRequestRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) {
- log.trace(SESSION_RECEIVED_SESSION_ACTOR_MSG, getType(session), session.getRemoteServer(), msg);
- service.onMsg(deserialize(session.getRemoteServer(), msg));
- }
-
- @Override
- public void onFromDeviceRpcResponseRpcMsg(GrpcSession session, ClusterAPIProtos.ToPluginRpcResponseRpcMessage msg) {
- log.trace(SESSION_RECEIVED_SESSION_ACTOR_MSG, getType(session), session.getRemoteServer(), msg);
- service.onMsg(deserialize(session.getRemoteServer(), msg));
+ public void onReceiveClusterGrpcMsg(GrpcSession session, ClusterAPIProtos.ClusterMessage clusterMessage) {
+ log.trace("{} Service [{}] received session actor msg {}", getType(session),
+ session.getRemoteServer(),
+ clusterMessage);
+ service.onRecievedMsg(clusterMessage);
}
- @Override
- public void onToAllNodesRpcMessage(GrpcSession session, ClusterAPIProtos.ToAllNodesRpcMessage msg) {
- log.trace(SESSION_RECEIVED_SESSION_ACTOR_MSG, getType(session), session.getRemoteServer(), msg);
- service.onMsg((ToAllNodesMsg) deserialize(msg.getData().toByteArray()));
- }
@Override
public void onError(GrpcSession session, Throwable t) {
@@ -130,37 +76,5 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
return session.isClient() ? "Client" : "Server";
}
- private static PluginRpcMsg convert(ServerAddress serverAddress, ClusterAPIProtos.ToPluginRpcMessage msg) {
- ClusterAPIProtos.PluginAddress address = msg.getAddress();
- TenantId tenantId = new TenantId(toUUID(address.getTenantId()));
- PluginId pluginId = new PluginId(toUUID(address.getPluginId()));
- RpcMsg rpcMsg = new RpcMsg(serverAddress, msg.getClazz(), msg.getData().toByteArray());
- return new PluginRpcMsg(tenantId, pluginId, rpcMsg);
- }
-
- private static UUID toUUID(ClusterAPIProtos.Uid uid) {
- return new UUID(uid.getPluginUuidMsb(), uid.getPluginUuidLsb());
- }
-
- private static ToDeviceRpcRequestActorMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) {
- TenantId deviceTenantId = new TenantId(toUUID(msg.getDeviceTenantId()));
- DeviceId deviceId = new DeviceId(toUUID(msg.getDeviceId()));
-
- ToDeviceRpcRequestBody requestBody = new ToDeviceRpcRequestBody(msg.getMethod(), msg.getParams());
- ToDeviceRpcRequest request = new ToDeviceRpcRequest(toUUID(msg.getMsgId()), deviceTenantId, deviceId, msg.getOneway(), msg.getExpTime(), requestBody);
-
- return new ToDeviceRpcRequestActorMsg(serverAddress, request);
- }
-
- private static ToPluginRpcResponseDeviceMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToPluginRpcResponseRpcMessage msg) {
- RpcError error = !StringUtils.isEmpty(msg.getError()) ? RpcError.valueOf(msg.getError()) : null;
- FromDeviceRpcResponse response = new FromDeviceRpcResponse(toUUID(msg.getMsgId()), msg.getResponse(), error);
- return new ToPluginRpcResponseDeviceMsg(null, null, response);
- }
-
- @SuppressWarnings("unchecked")
- private static <T extends Serializable> T deserialize(byte[] data) {
- return (T) SerializationUtils.deserialize(data);
- }
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcBroadcastMsg.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcBroadcastMsg.java
index 3718a22..2dd949e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcBroadcastMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcBroadcastMsg.java
@@ -23,5 +23,5 @@ import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
*/
@Data
public final class RpcBroadcastMsg {
- private final ClusterAPIProtos.ToRpcServerMessage msg;
+ private final ClusterAPIProtos.ClusterMessage msg;
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
index ba20013..ab21e5d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
@@ -40,7 +40,7 @@ public class RpcManagerActor extends ContextAwareActor {
private final Map<ServerAddress, SessionActorInfo> sessionActors;
- private final Map<ServerAddress, Queue<ClusterAPIProtos.ToRpcServerMessage>> pendingMsgs;
+ private final Map<ServerAddress, Queue<ClusterAPIProtos.ClusterMessage>> pendingMsgs;
private final ServerAddress instance;
@@ -65,8 +65,8 @@ public class RpcManagerActor extends ContextAwareActor {
@Override
public void onReceive(Object msg) throws Exception {
- if (msg instanceof RpcSessionTellMsg) {
- onMsg((RpcSessionTellMsg) msg);
+ if (msg instanceof ClusterAPIProtos.ClusterMessage) {
+ onMsg((ClusterAPIProtos.ClusterMessage) msg);
} else if (msg instanceof RpcBroadcastMsg) {
onMsg((RpcBroadcastMsg) msg);
} else if (msg instanceof RpcSessionCreateRequestMsg) {
@@ -84,27 +84,32 @@ public class RpcManagerActor extends ContextAwareActor {
private void onMsg(RpcBroadcastMsg msg) {
log.debug("Forwarding msg to session actors {}", msg);
- sessionActors.keySet().forEach(address -> onMsg(new RpcSessionTellMsg(address, msg.getMsg())));
+ sessionActors.keySet().forEach(address -> onMsg(msg.getMsg()));
pendingMsgs.values().forEach(queue -> queue.add(msg.getMsg()));
}
- private void onMsg(RpcSessionTellMsg msg) {
- ServerAddress address = msg.getServerAddress();
- SessionActorInfo session = sessionActors.get(address);
- if (session != null) {
- log.debug("{} Forwarding msg to session actor", address);
- session.actor.tell(msg, ActorRef.noSender());
- } else {
- log.debug("{} Storing msg to pending queue", address);
- Queue<ClusterAPIProtos.ToRpcServerMessage> queue = pendingMsgs.get(address);
- if (queue == null) {
- queue = new LinkedList<>();
- pendingMsgs.put(address, queue);
+ private void onMsg(ClusterAPIProtos.ClusterMessage msg) {
+ if (msg.hasServerAdresss()) {
+ ServerAddress address = new ServerAddress(msg.getServerAdresss().getHost(),
+ msg.getServerAdresss().getPort());
+ SessionActorInfo session = sessionActors.get(address);
+ if (session != null) {
+ log.debug("{} Forwarding msg to session actor", address);
+ session.getActor().tell(msg, ActorRef.noSender());
+ } else {
+ log.debug("{} Storing msg to pending queue", address);
+ Queue<ClusterAPIProtos.ClusterMessage> queue = pendingMsgs.get(address);
+ if (queue == null) {
+ queue = new LinkedList<>();
+ pendingMsgs.put(new ServerAddress(
+ msg.getServerAdresss().getHost(), msg.getServerAdresss().getPort()), queue);
+ }
+ queue.add(msg);
}
- queue.add(msg.getMsg());
+ } else {
+ logger.warning("Cluster msg doesn't have set Server Address [{}]", msg);
}
}
-
@Override
public void postStop() {
sessionActors.clear();
@@ -167,10 +172,10 @@ public class RpcManagerActor extends ContextAwareActor {
private void register(ServerAddress remoteAddress, UUID uuid, ActorRef sender) {
sessionActors.put(remoteAddress, new SessionActorInfo(uuid, sender));
log.debug("[{}][{}] Registering session actor.", remoteAddress, uuid);
- Queue<ClusterAPIProtos.ToRpcServerMessage> data = pendingMsgs.remove(remoteAddress);
+ Queue<ClusterAPIProtos.ClusterMessage> data = pendingMsgs.remove(remoteAddress);
if (data != null) {
log.debug("[{}][{}] Forwarding {} pending messages.", remoteAddress, uuid, data.size());
- data.forEach(msg -> sender.tell(new RpcSessionTellMsg(remoteAddress, msg), ActorRef.noSender()));
+ data.forEach(msg -> sender.tell(new RpcSessionTellMsg(msg), ActorRef.noSender()));
} else {
log.debug("[{}][{}] No pending messages to forward.", remoteAddress, uuid);
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
index a187444..9d0f3e8 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
@@ -32,6 +32,8 @@ import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
import java.util.UUID;
+import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CONNECT_RPC_MESSAGE;
+
/**
* @author Andrew Shvayka
*/
@@ -56,15 +58,15 @@ public class RpcSessionActor extends ContextAwareActor {
@Override
public void onReceive(Object msg) throws Exception {
- if (msg instanceof RpcSessionTellMsg) {
- tell((RpcSessionTellMsg) msg);
+ if (msg instanceof ClusterAPIProtos.ClusterMessage) {
+ tell((ClusterAPIProtos.ClusterMessage) msg);
} else if (msg instanceof RpcSessionCreateRequestMsg) {
initSession((RpcSessionCreateRequestMsg) msg);
}
}
- private void tell(RpcSessionTellMsg msg) {
- session.sendMsg(msg.getMsg());
+ private void tell(ClusterAPIProtos.ClusterMessage msg) {
+ session.sendMsg(msg);
}
@Override
@@ -91,7 +93,7 @@ public class RpcSessionActor extends ContextAwareActor {
session.initInputStream();
ClusterRpcServiceGrpc.ClusterRpcServiceStub stub = ClusterRpcServiceGrpc.newStub(channel);
- StreamObserver<ClusterAPIProtos.ToRpcServerMessage> outputStream = stub.handlePluginMsgs(session.getInputStream());
+ StreamObserver<ClusterAPIProtos.ClusterMessage> outputStream = stub.handleMsgs(session.getInputStream());
session.setOutputStream(outputStream);
session.initOutputStream();
@@ -115,11 +117,10 @@ public class RpcSessionActor extends ContextAwareActor {
}
}
- private ClusterAPIProtos.ToRpcServerMessage toConnectMsg() {
+ private ClusterAPIProtos.ClusterMessage toConnectMsg() {
ServerAddress instance = systemContext.getDiscoveryService().getCurrentServer().getServerAddress();
- return ClusterAPIProtos.ToRpcServerMessage.newBuilder().setConnectMsg(
- ClusterAPIProtos.ConnectRpcMessage.newBuilder().setServerAddress(
- ClusterAPIProtos.ServerAddress.newBuilder().setHost(instance.getHost()).setPort(instance.getPort()).build()).build()).build();
-
+ return ClusterAPIProtos.ClusterMessage.newBuilder().setMessageType(CONNECT_RPC_MESSAGE).setServerAdresss(
+ ClusterAPIProtos.ServerAddress.newBuilder().setHost(instance.getHost())
+ .setPort(instance.getPort()).build()).build();
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionCreateRequestMsg.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionCreateRequestMsg.java
index 5bcf1d6..0c1136e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionCreateRequestMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionCreateRequestMsg.java
@@ -30,6 +30,6 @@ public final class RpcSessionCreateRequestMsg {
private final UUID msgUid;
private final ServerAddress remoteAddress;
- private final StreamObserver<ClusterAPIProtos.ToRpcServerMessage> responseObserver;
+ private final StreamObserver<ClusterAPIProtos.ClusterMessage> responseObserver;
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionTellMsg.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionTellMsg.java
index 5a61044..858e3aa 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionTellMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionTellMsg.java
@@ -24,6 +24,5 @@ import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
*/
@Data
public final class RpcSessionTellMsg {
- private final ServerAddress serverAddress;
- private final ClusterAPIProtos.ToRpcServerMessage msg;
+ private final ClusterAPIProtos.ClusterMessage msg;
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
index e15eb2f..5a097df 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.actors.service;
import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.service.cluster.discovery.DiscoveryServiceListener;
@@ -27,10 +28,11 @@ public interface ActorService extends SessionMsgProcessor, WebSocketMsgProcessor
void onEntityStateChange(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state);
- void onMsg(ServiceToRuleEngineMsg msg);
+ void onMsg(SendToClusterMsg msg);
void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId);
void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType);
+ void onMsg(ServiceToRuleEngineMsg serviceToRuleEngineMsg);
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index df0d122..05eda75 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -19,6 +19,7 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Terminated;
+import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -32,8 +33,10 @@ import org.thingsboard.server.actors.session.SessionManagerActor;
import org.thingsboard.server.actors.stats.StatsActor;
import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
@@ -46,6 +49,7 @@ import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg
import org.thingsboard.server.extensions.api.plugins.msg.ToPluginActorMsg;
import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
import org.thingsboard.server.extensions.api.plugins.ws.msg.PluginWebsocketMsg;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
import org.thingsboard.server.service.cluster.discovery.ServerInstance;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
@@ -57,6 +61,9 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Optional;
+import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_NETWORK_SERVER_DATA_MESSAGE;
+import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.RPC_BROADCAST_MSG;
+
@Service
@Slf4j
public class DefaultActorService implements ActorService {
@@ -127,7 +134,7 @@ public class DefaultActorService implements ActorService {
}
@Override
- public void onMsg(ServiceToRuleEngineMsg msg) {
+ public void onMsg(SendToClusterMsg msg) {
appActor.tell(msg, ActorRef.noSender());
}
@@ -149,53 +156,7 @@ public class DefaultActorService implements ActorService {
appActor.tell(msg, ActorRef.noSender());
}
- @Override
- public void onMsg(ToPluginActorMsg msg) {
- log.trace("Processing plugin rpc msg: {}", msg);
- appActor.tell(msg, ActorRef.noSender());
- }
-
- @Override
- public void onMsg(DeviceToDeviceActorMsg msg) {
- log.trace("Processing device rpc msg: {}", msg);
- appActor.tell(msg, ActorRef.noSender());
- }
-
- @Override
- public void onMsg(ToDeviceActorNotificationMsg msg) {
- log.trace("Processing notification rpc msg: {}", msg);
- appActor.tell(msg, ActorRef.noSender());
- }
-
- @Override
- public void onMsg(ToDeviceSessionActorMsg msg) {
- log.trace("Processing session rpc msg: {}", msg);
- sessionManagerActor.tell(msg, ActorRef.noSender());
- }
-
- @Override
- public void onMsg(ToAllNodesMsg msg) {
- log.trace("Processing broadcast rpc msg: {}", msg);
- appActor.tell(msg, ActorRef.noSender());
- }
- @Override
- public void onMsg(RpcSessionCreateRequestMsg msg) {
- log.trace("Processing session create msg: {}", msg);
- rpcManagerActor.tell(msg, ActorRef.noSender());
- }
-
- @Override
- public void onMsg(RpcSessionTellMsg msg) {
- log.trace("Processing session rpc msg: {}", msg);
- rpcManagerActor.tell(msg, ActorRef.noSender());
- }
-
- @Override
- public void onMsg(RpcBroadcastMsg msg) {
- log.trace("Processing broadcast rpc msg: {}", msg);
- rpcManagerActor.tell(msg, ActorRef.noSender());
- }
@Override
public void onServerAdded(ServerInstance server) {
@@ -223,28 +184,29 @@ public class DefaultActorService implements ActorService {
@Override
public void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId) {
DeviceCredentialsUpdateNotificationMsg msg = new DeviceCredentialsUpdateNotificationMsg(tenantId, deviceId);
- Optional<ServerAddress> address = actorContext.getRoutingService().resolveById(deviceId);
- if (address.isPresent()) {
- rpcService.tell(address.get(), msg);
- } else {
- onMsg(msg);
- }
+ appActor.tell(new SendToClusterMsg(deviceId, msg), ActorRef.noSender());
}
@Override
public void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType) {
log.trace("[{}] Processing onDeviceNameOrTypeUpdate event, deviceName: {}, deviceType: {}", deviceId, deviceName, deviceType);
DeviceNameOrTypeUpdateMsg msg = new DeviceNameOrTypeUpdateMsg(tenantId, deviceId, deviceName, deviceType);
- Optional<ServerAddress> address = actorContext.getRoutingService().resolveById(deviceId);
- if (address.isPresent()) {
- rpcService.tell(address.get(), msg);
- } else {
- onMsg(msg);
- }
+ appActor.tell(new SendToClusterMsg(deviceId, msg), ActorRef.noSender());
+ }
+
+ @Override
+ public void onMsg(ServiceToRuleEngineMsg msg) {
+ appActor.tell(msg, ActorRef.noSender());
}
public void broadcast(ToAllNodesMsg msg) {
- rpcService.broadcast(msg);
+ actorContext.getEncodingService().encode(msg);
+ rpcService.broadcast(new RpcBroadcastMsg(ClusterAPIProtos.ClusterMessage
+ .newBuilder()
+ .setPayload(ByteString
+ .copyFrom(actorContext.getEncodingService().encode(msg)))
+ .setMessageType(CLUSTER_NETWORK_SERVER_DATA_MESSAGE)
+ .build()));
appActor.tell(msg, ActorRef.noSender());
}
@@ -253,4 +215,37 @@ public class DefaultActorService implements ActorService {
this.sessionManagerActor.tell(msg, ActorRef.noSender());
this.rpcManagerActor.tell(msg, ActorRef.noSender());
}
+
+ @Override
+ public void onRecievedMsg(ClusterAPIProtos.ClusterMessage msg) {
+ switch(msg.getMessageType()) {
+ case CLUSTER_NETWORK_SERVER_DATA_MESSAGE:
+ java.util.Optional<TbActorMsg> decodedMsg = actorContext.getEncodingService()
+ .decode(msg.getPayload().toByteArray());
+ if (decodedMsg.isPresent()) {
+ appActor.tell(decodedMsg.get(), ActorRef.noSender());
+ } else {
+ log.error("Error during decoding cluster proto message");
+ }
+ break;
+ case TO_ALL_NODES_MSG:
+ //ToDo
+ break;
+ }
+ }
+
+ @Override
+ public void onSendMsg(ClusterAPIProtos.ClusterMessage msg) {
+ rpcManagerActor.tell(msg, ActorRef.noSender());
+ }
+
+ @Override
+ public void onRpcSessionCreateRequestMsg(RpcSessionCreateRequestMsg msg) {
+ rpcManagerActor.tell(msg, ActorRef.noSender());
+ }
+
+ @Override
+ public void onBroadcastMsg(RpcBroadcastMsg msg) {
+ rpcManagerActor.tell(msg, ActorRef.noSender());
+ }
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java
index bbaef5d..0e0175b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java
@@ -21,6 +21,7 @@ import org.thingsboard.server.actors.shared.SessionTimeoutMsg;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.device.BasicDeviceToDeviceActorMsg;
import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
@@ -87,22 +88,19 @@ abstract class AbstractSessionActorMsgProcessor extends AbstractContextAwareMsgP
}
protected Optional<ServerAddress> forwardToAppActorIfAdressChanged(ActorContext ctx, DeviceToDeviceActorMsg toForward, Optional<ServerAddress> oldAddress) {
+
Optional<ServerAddress> newAddress = systemContext.getRoutingService().resolveById(toForward.getDeviceId());
if (!newAddress.equals(oldAddress)) {
- if (newAddress.isPresent()) {
- systemContext.getRpcService().tell(newAddress.get(),
- toForward.toOtherAddress(systemContext.getRoutingService().getCurrentServer()));
- } else {
- getAppActor().tell(toForward, ctx.self());
- }
+ getAppActor().tell(new SendToClusterMsg(toForward.getDeviceId(), toForward
+ .toOtherAddress(systemContext.getRoutingService().getCurrentServer())), ctx.self());
}
return newAddress;
}
protected void forwardToAppActor(ActorContext ctx, DeviceToDeviceActorMsg toForward, Optional<ServerAddress> address) {
if (address.isPresent()) {
- systemContext.getRpcService().tell(address.get(),
- toForward.toOtherAddress(systemContext.getRoutingService().getCurrentServer()));
+ systemContext.getRpcService().tell(systemContext.getEncodingService().convertToProtoDataMessage(address.get(),
+ toForward.toOtherAddress(systemContext.getRoutingService().getCurrentServer())));
} else {
getAppActor().tell(toForward, ctx.self());
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java
index 03c9694..bc386eb 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java
@@ -20,7 +20,7 @@ import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.gen.discovery.ServerInstanceProtos.ServerInfo;
+import org.thingsboard.server.gen.discovery.ServerInstanceProtos;
/**
* @author Andrew Shvayka
@@ -29,8 +29,6 @@ import org.thingsboard.server.gen.discovery.ServerInstanceProtos.ServerInfo;
@EqualsAndHashCode(exclude = {"serverInfo", "serverAddress"})
public final class ServerInstance implements Comparable<ServerInstance> {
- @Getter(AccessLevel.PACKAGE)
- private final ServerInfo serverInfo;
@Getter
private final String host;
@Getter
@@ -38,8 +36,13 @@ public final class ServerInstance implements Comparable<ServerInstance> {
@Getter
private final ServerAddress serverAddress;
- public ServerInstance(ServerInfo serverInfo) {
- this.serverInfo = serverInfo;
+ public ServerInstance(ServerAddress serverAddress) {
+ this.serverAddress = serverAddress;
+ this.host = serverAddress.getHost();
+ this.port = serverAddress.getPort();
+ }
+
+ public ServerInstance(ServerInstanceProtos.ServerInfo serverInfo) {
this.host = serverInfo.getHost();
this.port = serverInfo.getPort();
this.serverAddress = new ServerAddress(host, port);
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
index 818d2b1..fbda119 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
@@ -15,8 +15,9 @@
*/
package org.thingsboard.server.service.cluster.discovery;
-import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SerializationUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
@@ -33,13 +34,13 @@ import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
-import org.thingsboard.server.gen.discovery.ServerInstanceProtos.ServerInfo;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.utils.MiscUtils;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import java.io.IOException;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
@@ -113,7 +114,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
log.info("[{}:{}] Creating ZK node for current instance", self.getHost(), self.getPort());
nodePath = client.create()
.creatingParentsIfNeeded()
- .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", self.getServerInfo().toByteArray());
+ .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress()));
log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath);
} catch (Exception e) {
log.error("Failed to create ZK node", e);
@@ -144,8 +145,8 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
.filter(cd -> !cd.getPath().equals(nodePath))
.map(cd -> {
try {
- return new ServerInstance(ServerInfo.parseFrom(cd.getData()));
- } catch (InvalidProtocolBufferException e) {
+ return new ServerInstance( (ServerAddress) SerializationUtils.deserialize(cd.getData()));
+ } catch (NoSuchElementException e) {
log.error("Failed to decode ZK node", e);
throw new RuntimeException(e);
}
@@ -186,8 +187,9 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
}
ServerInstance instance;
try {
- instance = new ServerInstance(ServerInfo.parseFrom(data.getData()));
- } catch (IOException e) {
+ ServerAddress serverAddress = SerializationUtils.deserialize(data.getData());
+ instance = new ServerInstance(serverAddress);
+ } catch (SerializationException e) {
log.error("Failed to decode server instance for node {}", data.getPath(), e);
throw e;
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
index 27334c6..49c1a23 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
@@ -25,26 +25,20 @@ import org.springframework.stereotype.Service;
import org.springframework.util.SerializationUtils;
import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
-import org.thingsboard.server.actors.rpc.RpcSessionTellMsg;
import org.thingsboard.server.common.data.id.EntityId;
-import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
-import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
-import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
-import org.thingsboard.server.extensions.api.plugins.msg.ToPluginRpcResponseDeviceMsg;
-import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
+
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
+
import org.thingsboard.server.gen.cluster.ClusterRpcServiceGrpc;
import org.thingsboard.server.service.cluster.discovery.ServerInstance;
import org.thingsboard.server.service.cluster.discovery.ServerInstanceService;
-import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -64,7 +58,8 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
private ServerInstance instance;
- private ConcurrentMap<UUID, RpcSessionCreationFuture> pendingSessionMap = new ConcurrentHashMap<>();
+ private ConcurrentMap<UUID, BlockingQueue<StreamObserver<ClusterAPIProtos.ClusterMessage>>> pendingSessionMap =
+ new ConcurrentHashMap<>();
public void init(RpcMsgListener listener) {
this.listener = listener;
@@ -82,11 +77,11 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
}
@Override
- public void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ToRpcServerMessage> msg) {
- RpcSessionCreationFuture future = pendingSessionMap.remove(msgUid);
- if (future != null) {
+ public void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ClusterMessage> inputStream) {
+ BlockingQueue<StreamObserver<ClusterAPIProtos.ClusterMessage>> queue = pendingSessionMap.remove(msgUid);
+ if (queue != null) {
try {
- future.onMsg(msg);
+ queue.put(inputStream);
} catch (InterruptedException e) {
log.warn("Failed to report created session!");
Thread.currentThread().interrupt();
@@ -97,11 +92,13 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
}
@Override
- public StreamObserver<ClusterAPIProtos.ToRpcServerMessage> handlePluginMsgs(StreamObserver<ClusterAPIProtos.ToRpcServerMessage> responseObserver) {
+ public StreamObserver<ClusterAPIProtos.ClusterMessage> handleMsgs(
+ StreamObserver<ClusterAPIProtos.ClusterMessage> responseObserver) {
log.info("Processing new session.");
return createSession(new RpcSessionCreateRequestMsg(UUID.randomUUID(), null, responseObserver));
}
+
@PreDestroy
public void stop() {
if (server != null) {
@@ -117,65 +114,18 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
}
}
- @Override
- public void tell(ServerAddress serverAddress, DeviceToDeviceActorMsg toForward) {
- ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
- .setToDeviceActorRpcMsg(toProtoMsg(toForward)).build();
- tell(serverAddress, msg);
- }
-
- @Override
- public void tell(ServerAddress serverAddress, ToDeviceActorNotificationMsg toForward) {
- ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
- .setToDeviceActorNotificationRpcMsg(toProtoMsg(toForward)).build();
- tell(serverAddress, msg);
- }
-
- @Override
- public void tell(ServerAddress serverAddress, ToDeviceRpcRequestActorMsg toForward) {
- ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
- .setToDeviceRpcRequestRpcMsg(toProtoMsg(toForward)).build();
- tell(serverAddress, msg);
- }
-
- @Override
- public void tell(ServerAddress serverAddress, ToPluginRpcResponseDeviceMsg toForward) {
- ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
- .setToPluginRpcResponseRpcMsg(toProtoMsg(toForward)).build();
- tell(serverAddress, msg);
- }
-
- @Override
- public void tell(ServerAddress serverAddress, ToDeviceSessionActorMsg toForward) {
- ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
- .setToDeviceSessionActorRpcMsg(toProtoMsg(toForward)).build();
- tell(serverAddress, msg);
- }
@Override
- public void tell(PluginRpcMsg toForward) {
- ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
- .setToPluginRpcMsg(toProtoMsg(toForward)).build();
- tell(toForward.getRpcMsg().getServerAddress(), msg);
- }
-
- @Override
- public void broadcast(ToAllNodesMsg toForward) {
- ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
- .setToAllNodesRpcMsg(toProtoMsg(toForward)).build();
- listener.onMsg(new RpcBroadcastMsg(msg));
- }
-
- private void tell(ServerAddress serverAddress, ClusterAPIProtos.ToRpcServerMessage msg) {
- listener.onMsg(new RpcSessionTellMsg(serverAddress, msg));
+ public void broadcast(RpcBroadcastMsg msg) {
+ listener.onBroadcastMsg(msg);
}
- private StreamObserver<ClusterAPIProtos.ToRpcServerMessage> createSession(RpcSessionCreateRequestMsg msg) {
- RpcSessionCreationFuture future = new RpcSessionCreationFuture();
- pendingSessionMap.put(msg.getMsgUid(), future);
- listener.onMsg(msg);
+ private StreamObserver<ClusterAPIProtos.ClusterMessage> createSession(RpcSessionCreateRequestMsg msg) {
+ BlockingQueue<StreamObserver<ClusterAPIProtos.ClusterMessage>> queue = new ArrayBlockingQueue<>(1);
+ pendingSessionMap.put(msg.getMsgUid(), queue);
+ listener.onRpcSessionCreateRequestMsg(msg);
try {
- StreamObserver<ClusterAPIProtos.ToRpcServerMessage> observer = future.get();
+ StreamObserver<ClusterAPIProtos.ClusterMessage> observer = queue.take();
log.info("Processed new session.");
return observer;
} catch (Exception e) {
@@ -184,76 +134,10 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
}
}
- private static ClusterAPIProtos.ToDeviceActorRpcMessage toProtoMsg(DeviceToDeviceActorMsg msg) {
- return ClusterAPIProtos.ToDeviceActorRpcMessage.newBuilder().setData(
- ByteString.copyFrom(SerializationUtils.serialize(msg))
- ).build();
- }
-
- private static ClusterAPIProtos.ToDeviceActorNotificationRpcMessage toProtoMsg(ToDeviceActorNotificationMsg msg) {
- return ClusterAPIProtos.ToDeviceActorNotificationRpcMessage.newBuilder().setData(
- ByteString.copyFrom(SerializationUtils.serialize(msg))
- ).build();
- }
-
- private static ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toProtoMsg(ToDeviceRpcRequestActorMsg msg) {
- ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.Builder builder = ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.newBuilder();
- ToDeviceRpcRequest request = msg.getMsg();
-
- builder.setDeviceTenantId(toUid(msg.getTenantId()));
- builder.setDeviceId(toUid(msg.getDeviceId()));
-
- builder.setMsgId(toUid(request.getId()));
- builder.setOneway(request.isOneway());
- builder.setExpTime(request.getExpirationTime());
- builder.setMethod(request.getBody().getMethod());
- builder.setParams(request.getBody().getParams());
-
- return builder.build();
- }
-
- private static ClusterAPIProtos.ToPluginRpcResponseRpcMessage toProtoMsg(ToPluginRpcResponseDeviceMsg msg) {
- ClusterAPIProtos.ToPluginRpcResponseRpcMessage.Builder builder = ClusterAPIProtos.ToPluginRpcResponseRpcMessage.newBuilder();
- FromDeviceRpcResponse request = msg.getResponse();
-
- builder.setMsgId(toUid(request.getId()));
- request.getResponse().ifPresent(builder::setResponse);
- request.getError().ifPresent(e -> builder.setError(e.name()));
-
- return builder.build();
- }
-
- private ClusterAPIProtos.ToAllNodesRpcMessage toProtoMsg(ToAllNodesMsg msg) {
- return ClusterAPIProtos.ToAllNodesRpcMessage.newBuilder().setData(
- ByteString.copyFrom(SerializationUtils.serialize(msg))
- ).build();
- }
-
-
- private ClusterAPIProtos.ToPluginRpcMessage toProtoMsg(PluginRpcMsg msg) {
- return ClusterAPIProtos.ToPluginRpcMessage.newBuilder()
- .setClazz(msg.getRpcMsg().getMsgClazz())
- .setData(ByteString.copyFrom(msg.getRpcMsg().getMsgData()))
- .setAddress(ClusterAPIProtos.PluginAddress.newBuilder()
- .setTenantId(toUid(msg.getPluginTenantId().getId()))
- .setPluginId(toUid(msg.getPluginId().getId()))
- .build()
- ).build();
- }
-
- private static ClusterAPIProtos.Uid toUid(EntityId uuid) {
- return toUid(uuid.getId());
- }
-
- private static ClusterAPIProtos.Uid toUid(UUID uuid) {
- return ClusterAPIProtos.Uid.newBuilder().setPluginUuidMsb(uuid.getMostSignificantBits()).setPluginUuidLsb(
- uuid.getLeastSignificantBits()).build();
+ @Override
+ public void tell(ClusterAPIProtos.ClusterMessage message) {
+ listener.onSendMsg(message);
}
- private static ClusterAPIProtos.ToDeviceSessionActorRpcMessage toProtoMsg(ToDeviceSessionActorMsg msg) {
- return ClusterAPIProtos.ToDeviceSessionActorRpcMessage.newBuilder().setData(
- ByteString.copyFrom(SerializationUtils.serialize(msg))
- ).build();
- }
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
index 6aefe46..075857c 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
@@ -16,6 +16,7 @@
package org.thingsboard.server.service.cluster.rpc;
import io.grpc.stub.StreamObserver;
+import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
@@ -35,20 +36,10 @@ public interface ClusterRpcService {
void init(RpcMsgListener listener);
- void tell(ServerAddress serverAddress, DeviceToDeviceActorMsg toForward);
+ void broadcast(RpcBroadcastMsg msg);
- void tell(ServerAddress serverAddress, ToDeviceSessionActorMsg toForward);
+ void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ClusterMessage> inputStream);
- void tell(ServerAddress serverAddress, ToDeviceActorNotificationMsg toForward);
-
- void tell(ServerAddress serverAddress, ToDeviceRpcRequestActorMsg toForward);
-
- void tell(ServerAddress serverAddress, ToPluginRpcResponseDeviceMsg toForward);
-
- void tell(PluginRpcMsg toForward);
-
- void broadcast(ToAllNodesMsg msg);
-
- void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ToRpcServerMessage> inputStream);
+ void tell(ClusterAPIProtos.ClusterMessage message);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
index c403895..e56fb75 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
@@ -33,8 +33,8 @@ public final class GrpcSession implements Closeable {
private final UUID sessionId;
private final boolean client;
private final GrpcSessionListener listener;
- private StreamObserver<ClusterAPIProtos.ToRpcServerMessage> inputStream;
- private StreamObserver<ClusterAPIProtos.ToRpcServerMessage> outputStream;
+ private StreamObserver<ClusterAPIProtos.ClusterMessage> inputStream;
+ private StreamObserver<ClusterAPIProtos.ClusterMessage> outputStream;
private boolean connected;
private ServerAddress remoteServer;
@@ -56,17 +56,17 @@ public final class GrpcSession implements Closeable {
}
public void initInputStream() {
- this.inputStream = new StreamObserver<ClusterAPIProtos.ToRpcServerMessage>() {
+ this.inputStream = new StreamObserver<ClusterAPIProtos.ClusterMessage>() {
@Override
- public void onNext(ClusterAPIProtos.ToRpcServerMessage msg) {
- if (!connected && msg.hasConnectMsg()) {
+ public void onNext(ClusterAPIProtos.ClusterMessage clusterMessage) {
+ if (!connected && clusterMessage.getMessageType() == ClusterAPIProtos.MessageType.CONNECT_RPC_MESSAGE) {
connected = true;
- ClusterAPIProtos.ServerAddress rpcAddress = msg.getConnectMsg().getServerAddress();
+ ServerAddress rpcAddress = new ServerAddress(clusterMessage.getServerAdresss().getHost(), clusterMessage.getServerAdresss().getPort());
remoteServer = new ServerAddress(rpcAddress.getHost(), rpcAddress.getPort());
listener.onConnected(GrpcSession.this);
}
if (connected) {
- handleToRpcServerMessage(msg);
+ listener.onReceiveClusterGrpcMsg(GrpcSession.this, clusterMessage);
}
}
@@ -83,37 +83,13 @@ public final class GrpcSession implements Closeable {
};
}
- private void handleToRpcServerMessage(ClusterAPIProtos.ToRpcServerMessage msg) {
- if (msg.hasToPluginRpcMsg()) {
- listener.onToPluginRpcMsg(GrpcSession.this, msg.getToPluginRpcMsg());
- }
- if (msg.hasToDeviceActorRpcMsg()) {
- listener.onToDeviceActorRpcMsg(GrpcSession.this, msg.getToDeviceActorRpcMsg());
- }
- if (msg.hasToDeviceSessionActorRpcMsg()) {
- listener.onToDeviceSessionActorRpcMsg(GrpcSession.this, msg.getToDeviceSessionActorRpcMsg());
- }
- if (msg.hasToDeviceActorNotificationRpcMsg()) {
- listener.onToDeviceActorNotificationRpcMsg(GrpcSession.this, msg.getToDeviceActorNotificationRpcMsg());
- }
- if (msg.hasToDeviceRpcRequestRpcMsg()) {
- listener.onToDeviceRpcRequestRpcMsg(GrpcSession.this, msg.getToDeviceRpcRequestRpcMsg());
- }
- if (msg.hasToPluginRpcResponseRpcMsg()) {
- listener.onFromDeviceRpcResponseRpcMsg(GrpcSession.this, msg.getToPluginRpcResponseRpcMsg());
- }
- if (msg.hasToAllNodesRpcMsg()) {
- listener.onToAllNodesRpcMessage(GrpcSession.this, msg.getToAllNodesRpcMsg());
- }
- }
-
public void initOutputStream() {
if (client) {
listener.onConnected(GrpcSession.this);
}
}
- public void sendMsg(ClusterAPIProtos.ToRpcServerMessage msg) {
+ public void sendMsg(ClusterAPIProtos.ClusterMessage msg) {
outputStream.onNext(msg);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSessionListener.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSessionListener.java
index 44e0693..266b1f5 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSessionListener.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSessionListener.java
@@ -26,20 +26,7 @@ public interface GrpcSessionListener {
void onDisconnected(GrpcSession session);
- void onToPluginRpcMsg(GrpcSession session, ClusterAPIProtos.ToPluginRpcMessage msg);
-
- void onToDeviceActorRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceActorRpcMessage msg);
-
- void onToDeviceActorNotificationRpcMsg(GrpcSession grpcSession, ClusterAPIProtos.ToDeviceActorNotificationRpcMessage msg);
-
- void onToDeviceSessionActorRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceSessionActorRpcMessage msg);
-
- void onToAllNodesRpcMessage(GrpcSession grpcSession, ClusterAPIProtos.ToAllNodesRpcMessage toAllNodesRpcMessage);
-
- void onToDeviceRpcRequestRpcMsg(GrpcSession grpcSession, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toDeviceRpcRequestRpcMsg);
-
- void onFromDeviceRpcResponseRpcMsg(GrpcSession grpcSession, ClusterAPIProtos.ToPluginRpcResponseRpcMessage toPluginRpcResponseRpcMsg);
+ void onReceiveClusterGrpcMsg(GrpcSession session, ClusterAPIProtos.ClusterMessage clusterMessage);
void onError(GrpcSession session, Throwable t);
-
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcMsgListener.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcMsgListener.java
index 5d26fae..88d17e0 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcMsgListener.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcMsgListener.java
@@ -17,32 +17,15 @@ package org.thingsboard.server.service.cluster.rpc;
import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
-import org.thingsboard.server.actors.rpc.RpcSessionTellMsg;
-import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
-import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.ToPluginActorMsg;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
/**
* @author Andrew Shvayka
*/
-public interface RpcMsgListener {
-
- void onMsg(DeviceToDeviceActorMsg msg);
-
- void onMsg(ToDeviceActorNotificationMsg msg);
-
- void onMsg(ToDeviceSessionActorMsg msg);
-
- void onMsg(ToAllNodesMsg nodeMsg);
-
- void onMsg(ToPluginActorMsg msg);
-
- void onMsg(RpcSessionCreateRequestMsg msg);
-
- void onMsg(RpcSessionTellMsg rpcSessionTellMsg);
-
- void onMsg(RpcBroadcastMsg rpcBroadcastMsg);
+public interface RpcMsgListener {
+ void onRecievedMsg(ClusterAPIProtos.ClusterMessage msg);
+ void onSendMsg(ClusterAPIProtos.ClusterMessage msg);
+ void onRpcSessionCreateRequestMsg(RpcSessionCreateRequestMsg msg);
+ void onBroadcastMsg(RpcBroadcastMsg msg);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/encoding/DataDecodingEncodingService.java b/application/src/main/java/org/thingsboard/server/service/encoding/DataDecodingEncodingService.java
new file mode 100644
index 0000000..248e9f3
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/encoding/DataDecodingEncodingService.java
@@ -0,0 +1,34 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.encoding;
+
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
+
+import java.util.Optional;
+
+public interface DataDecodingEncodingService {
+
+ Optional<TbActorMsg> decode(byte[] byteArray);
+
+ byte[] encode(TbActorMsg msq);
+
+ ClusterAPIProtos.ClusterMessage convertToProtoDataMessage(ServerAddress serverAddress,
+ TbActorMsg msg);
+
+}
+
diff --git a/application/src/main/java/org/thingsboard/server/service/encoding/ProtoWithJavaSerializationDecodingEncodingService.java b/application/src/main/java/org/thingsboard/server/service/encoding/ProtoWithJavaSerializationDecodingEncodingService.java
new file mode 100644
index 0000000..f6a365e
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/encoding/ProtoWithJavaSerializationDecodingEncodingService.java
@@ -0,0 +1,67 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.encoding;
+
+import com.google.protobuf.ByteString;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.util.SerializationUtils;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
+
+import java.util.Optional;
+
+import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_NETWORK_SERVER_DATA_MESSAGE;
+
+
+@Slf4j
+@Service
+public class ProtoWithJavaSerializationDecodingEncodingService implements DataDecodingEncodingService {
+
+
+ @Override
+ public Optional<TbActorMsg> decode(byte[] byteArray) {
+ try {
+ TbActorMsg msg = (TbActorMsg) SerializationUtils.deserialize(byteArray);
+ return Optional.of(msg);
+
+ } catch (IllegalArgumentException e) {
+ log.error("Error during deserialization message, [{}]", e.getMessage());
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public byte[] encode(TbActorMsg msq) {
+ return SerializationUtils.serialize(msq);
+ }
+
+ @Override
+ public ClusterAPIProtos.ClusterMessage convertToProtoDataMessage(ServerAddress serverAddress,
+ TbActorMsg msg) {
+ return ClusterAPIProtos.ClusterMessage
+ .newBuilder()
+ .setServerAdresss(ClusterAPIProtos.ServerAddress
+ .newBuilder()
+ .setHost(serverAddress.getHost())
+ .setPort(serverAddress.getPort())
+ .build())
+ .setMessageType(CLUSTER_NETWORK_SERVER_DATA_MESSAGE)
+ .setPayload(ByteString.copyFrom(encode(msg))).build();
+
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
index 9674490..6a1a69a 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
@@ -30,6 +30,8 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UUIDBased;
import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
@@ -38,6 +40,7 @@ import org.thingsboard.server.dao.audit.AuditLogService;
import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.security.model.SecurityUser;
@@ -135,23 +138,16 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
@Override
public void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body) {
ToServerRpcResponseActorMsg rpcMsg = new ToServerRpcResponseActorMsg(tenantId, deviceId, new ToServerRpcResponseMsg(requestId, body));
- forward(deviceId, rpcMsg, rpcService::tell);
+ forward(deviceId, rpcMsg);
}
private void sendRpcRequest(ToDeviceRpcRequest msg) {
log.trace("[{}] Forwarding msg {} to device actor!", msg.getDeviceId(), msg);
ToDeviceRpcRequestActorMsg rpcMsg = new ToDeviceRpcRequestActorMsg(msg);
- forward(msg.getDeviceId(), rpcMsg, rpcService::tell);
+ forward(msg.getDeviceId(), rpcMsg);
}
- private <T extends ToDeviceActorNotificationMsg> void forward(DeviceId deviceId, T msg, BiConsumer<ServerAddress, T> rpcFunction) {
- Optional<ServerAddress> instance = routingService.resolveById(deviceId);
- if (instance.isPresent()) {
- log.trace("[{}] Forwarding msg {} to remote device actor!", msg.getTenantId(), msg);
- rpcFunction.accept(instance.get(), msg);
- } else {
- log.trace("[{}] Forwarding msg {} to local device actor!", msg.getTenantId(), msg);
- actorService.onMsg(msg);
- }
+ private <T extends ToDeviceActorNotificationMsg> void forward(DeviceId deviceId, T msg) {
+ actorService.onMsg(new SendToClusterMsg(deviceId, msg));
}
}
application/src/main/proto/cluster.proto 134(+64 -70)
diff --git a/application/src/main/proto/cluster.proto b/application/src/main/proto/cluster.proto
index fc92487..2ecffe7 100644
--- a/application/src/main/proto/cluster.proto
+++ b/application/src/main/proto/cluster.proto
@@ -19,79 +19,73 @@ package cluster;
option java_package = "org.thingsboard.server.gen.cluster";
option java_outer_classname = "ClusterAPIProtos";
-message ServerAddress {
- string host = 1;
- int32 port = 2;
-}
-
-message Uid {
- sint64 pluginUuidMsb = 1;
- sint64 pluginUuidLsb = 2;
-}
-
-message PluginAddress {
- Uid pluginId = 1;
- Uid tenantId = 2;
-}
-
-message ToPluginRpcMessage {
- PluginAddress address = 1;
- int32 clazz = 2;
- bytes data = 3;
-}
-
-message ToDeviceActorRpcMessage {
- bytes data = 1;
-}
-
-message ToDeviceSessionActorRpcMessage {
- bytes data = 1;
-}
-
-message ToDeviceActorNotificationRpcMessage {
- bytes data = 1;
-}
-
-message ToAllNodesRpcMessage {
- bytes data = 1;
-}
-
-message ConnectRpcMessage {
- ServerAddress serverAddress = 1;
-}
-
-message ToDeviceRpcRequestRpcMessage {
- Uid deviceTenantId = 2;
- Uid deviceId = 3;
-
- Uid msgId = 4;
- bool oneway = 5;
- int64 expTime = 6;
- string method = 7;
- string params = 8;
-}
-
-message ToPluginRpcResponseRpcMessage {
- Uid msgId = 2;
- string response = 3;
- string error = 4;
-}
-
-message ToRpcServerMessage {
- ConnectRpcMessage connectMsg = 1;
- ToPluginRpcMessage toPluginRpcMsg = 2;
- ToDeviceActorRpcMessage toDeviceActorRpcMsg = 3;
- ToDeviceSessionActorRpcMessage toDeviceSessionActorRpcMsg = 4;
- ToDeviceActorNotificationRpcMessage toDeviceActorNotificationRpcMsg = 5;
- ToAllNodesRpcMessage toAllNodesRpcMsg = 6;
- ToDeviceRpcRequestRpcMessage toDeviceRpcRequestRpcMsg = 7;
- ToPluginRpcResponseRpcMessage toPluginRpcResponseRpcMsg = 8;
-}
+//message Uid {
+// sint64 pluginUuidMsb = 1;
+// sint64 pluginUuidLsb = 2;
+//}
+//
+//message PluginAddress {
+// Uid pluginId = 1;
+// Uid tenantId = 2;
+//}
+//
+//message ToPluginRpcMessage {
+// PluginAddress address = 1;
+// int32 clazz = 2;
+// bytes data = 3;
+//}
+//
+//message ToDeviceActorRpcMessage {
+// bytes data = 1;
+//}
+//
+//message ToDeviceSessionActorRpcMessage {
+// bytes data = 1;
+//}
+//
+//message ToDeviceActorNotificationRpcMessage {
+// bytes data = 1;
+//}
+//
+//message ToAllNodesRpcMessage {
+// bytes data = 1;
+//}
+//
+//message ConnectRpcMessage {
+// ServerAddress serverAddress = 1;
+//}
+//
+//message ToDeviceRpcRequestRpcMessage {
+// Uid deviceTenantId = 2;
+// Uid deviceId = 3;
+//
+// Uid msgId = 4;
+// bool oneway = 5;
+// int64 expTime = 6;
+// string method = 7;
+// string params = 8;
+//}
+//
+//message ToPluginRpcResponseRpcMessage {
+// Uid msgId = 2;
+// string response = 3;
+// string error = 4;
+//}
+//
+//message ToRpcServerMessage {
+// ConnectRpcMessage connectMsg = 1;
+// ToPluginRpcMessage toPluginRpcMsg = 2;
+// ToDeviceActorRpcMessage toDeviceActorRpcMsg = 3;
+// ToDeviceSessionActorRpcMessage toDeviceSessionActorRpcMsg = 4;
+// ToDeviceActorNotificationRpcMessage toDeviceActorNotificationRpcMsg = 5;
+// ToAllNodesRpcMessage toAllNodesRpcMsg = 6;
+// ToDeviceRpcRequestRpcMessage toDeviceRpcRequestRpcMsg = 7;
+// ToPluginRpcResponseRpcMessage toPluginRpcResponseRpcMsg = 8;
+//}
service ClusterRpcService {
- rpc handlePluginMsgs(stream ToRpcServerMessage) returns (stream ToRpcServerMessage) {}
+ rpc handleMsgs(stream ClusterMessage) returns (stream ClusterMessage) {}
}
-
message ClusterMessage {
MessageType messageType = 1;
MessageMataInfo messageMetaInfo = 2;
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/SendToClusterMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/SendToClusterMsg.java
new file mode 100644
index 0000000..d3bc8e9
--- /dev/null
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/SendToClusterMsg.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.msg.cluster;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
+
+@Data
+public class SendToClusterMsg implements TbActorMsg {
+
+ private TbActorMsg msg;
+ private EntityId entityId;
+
+ public SendToClusterMsg(EntityId entityId, TbActorMsg msg) {
+ this.entityId = entityId;
+ this.msg = msg;
+ }
+
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.SEND_TO_CLUSTER_MSG;
+ }
+}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ToAllNodesMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ToAllNodesMsg.java
index 1dc33c0..877689d 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ToAllNodesMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ToAllNodesMsg.java
@@ -15,10 +15,12 @@
*/
package org.thingsboard.server.common.msg.cluster;
+import org.thingsboard.server.common.msg.TbActorMsg;
+
import java.io.Serializable;
/**
* @author Andrew Shvayka
*/
-public interface ToAllNodesMsg extends Serializable {
+public interface ToAllNodesMsg extends Serializable, TbActorMsg {
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicToDeviceSessionActorMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicToDeviceSessionActorMsg.java
index 6a70891..d01c2b4 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicToDeviceSessionActorMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicToDeviceSessionActorMsg.java
@@ -16,6 +16,7 @@
package org.thingsboard.server.common.msg.core;
import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
public class BasicToDeviceSessionActorMsg implements ToDeviceSessionActorMsg {
@@ -44,4 +45,8 @@ public class BasicToDeviceSessionActorMsg implements ToDeviceSessionActorMsg {
return "BasicToSessionResponseMsg [msg=" + msg + ", sessionId=" + sessionId + "]";
}
+ @Override
+ public MsgType getMsgType() {
+ return null;
+ }
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/ToDeviceSessionActorMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/ToDeviceSessionActorMsg.java
index f3e0c0f..d190579 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/ToDeviceSessionActorMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/ToDeviceSessionActorMsg.java
@@ -15,6 +15,7 @@
*/
package org.thingsboard.server.common.msg.core;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
@@ -23,7 +24,7 @@ import java.io.Serializable;
/**
* @author Andrew Shvayka
*/
-public interface ToDeviceSessionActorMsg extends SessionAwareMsg, Serializable {
+public interface ToDeviceSessionActorMsg extends SessionAwareMsg, Serializable, TbActorMsg {
ToDeviceMsg getMsg();
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index d63456e..c7a96db 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -29,6 +29,11 @@ public enum MsgType {
CLUSTER_EVENT_MSG,
/**
+ * All messages, could be send to cluster
+ */
+ SEND_TO_CLUSTER_MSG,
+
+ /**
* ADDED/UPDATED/DELETED events for main entities.
*
* See {@link org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
index c104281..2ee8432 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
@@ -20,7 +20,6 @@ import lombok.ToString;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
-import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;