thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 14(+8 -6)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java 8(+4 -4)
application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java 64(+40 -24)
application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestActorMsg.java 9(+1 -8)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index e1c7e08..d453e59 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -38,6 +38,7 @@ import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.core.BasicActorSystemToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
@@ -112,21 +113,27 @@ public class AppActor extends RuleChainManagerActor {
case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
onToDeviceActorMsg((TenantAwareMsg) msg);
break;
+ case ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG:
+ onToDeviceSessionMsg((BasicActorSystemToDeviceSessionActorMsg) msg);
default:
return false;
}
return true;
}
+ private void onToDeviceSessionMsg(BasicActorSystemToDeviceSessionActorMsg msg) {
+ systemContext.getSessionManagerActor().tell(msg, self());
+ }
+
private void onPossibleClusterMsg(SendToClusterMsg msg) {
Optional<ServerAddress> address = systemContext.getRoutingService().resolveById(msg.getEntityId());
- if (address.isPresent()) {
- systemContext.getRpcService().tell(
- systemContext.getEncodingService().convertToProtoDataMessage(address.get(), msg.getMsg()));
- } else {
- self().tell(msg.getMsg(), ActorRef.noSender());
- }
+ if (address.isPresent()) {
+ systemContext.getRpcService().tell(
+ systemContext.getEncodingService().convertToProtoDataMessage(address.get(), msg.getMsg()));
+ } else {
+ self().tell(msg.getMsg(), ActorRef.noSender());
}
+ }
private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
if (SYSTEM_TENANT.equals(msg.getTenantId())) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 625d6e9..d416497 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -163,7 +163,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (request.isOneway() && sent) {
logger.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId());
- systemContext.getDeviceRpcService().process(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null));
+ systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(msg.getMsg().getId(), msg.getServerAddress(), null, null));
} else {
registerPendingRpcRequest(context, msg, sent, rpcRequest, timeout);
}
@@ -185,8 +185,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId());
if (requestMd != null) {
logger.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
- systemContext.getDeviceRpcService().process(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
- null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
+ systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
+ requestMd.getMsg().getServerAddress(), null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
}
}
@@ -234,11 +234,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(ActorContext context, SessionId sessionId, Optional<ServerAddress> server, Set<Integer> sentOneWayIds) {
return entry -> {
+ ToDeviceRpcRequestActorMsg requestActorMsg = entry.getValue().getMsg();
ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg();
ToDeviceRpcRequestBody body = request.getBody();
if (request.isOneway()) {
sentOneWayIds.add(entry.getKey());
- systemContext.getDeviceRpcService().process(new FromDeviceRpcResponse(request.getId(), null, null));
+ systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(request.getId(), requestActorMsg.getServerAddress(), null, null));
}
ToDeviceRpcRequestMsg rpcRequest = new ToDeviceRpcRequestMsg(
entry.getKey(),
@@ -360,7 +361,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
kv.getStrValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
}
TbMsgMetaData metaData = defaultMetaData.copy();
- metaData.putValue("ts", entry.getKey()+"");
+ metaData.putValue("ts", entry.getKey() + "");
TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, metaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
pushToRuleEngineWithTimeout(context, tbMsg, msgData);
}
@@ -451,7 +452,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
boolean success = requestMd != null;
if (success) {
- systemContext.getDeviceRpcService().process(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), responseMsg.getData(), null));
+ systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
+ requestMd.getMsg().getServerAddress(), responseMsg.getData(), null));
} else {
logger.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
index dcc6e5c..3f3f70b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
@@ -114,6 +114,7 @@ public class RpcManagerActor extends ContextAwareActor {
logger.warning("Cluster msg doesn't have set Server Address [{}]", msg);
}
}
+
@Override
public void postStop() {
sessionActors.clear();
@@ -157,7 +158,7 @@ public class RpcManagerActor extends ContextAwareActor {
private void onSessionClose(boolean reconnect, ServerAddress remoteAddress) {
log.debug("[{}] session closed. Should reconnect: {}", remoteAddress, reconnect);
SessionActorInfo sessionRef = sessionActors.get(remoteAddress);
- if (context().sender().equals(sessionRef.actor)) {
+ if (context().sender() != null && context().sender().equals(sessionRef.actor)) {
sessionActors.remove(remoteAddress);
pendingMsgs.remove(remoteAddress);
if (reconnect) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index b888bc3..17ffd16 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -27,7 +27,6 @@ import org.thingsboard.rule.engine.api.ScriptEngine;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.server.actors.ActorSystemContext;
-import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleNodeId;
@@ -224,7 +223,7 @@ class DefaultTbContext implements TbContext {
public void sendRpcRequest(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) {
ToDeviceRpcRequest request = new ToDeviceRpcRequest(UUIDs.timeBased(), nodeCtx.getTenantId(), src.getDeviceId(),
src.isOneway(), System.currentTimeMillis() + src.getTimeout(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()));
- mainCtx.getDeviceRpcService().process(request, response -> {
+ mainCtx.getDeviceRpcService().processRpcRequestToDevice(request, response -> {
consumer.accept(RuleEngineDeviceRpcResponse.builder()
.deviceId(src.getDeviceId())
.requestId(src.getRequestId())
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index cc73eae..7d560db 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index 82ca95f..3507f24 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -233,6 +233,9 @@ public class DefaultActorService implements ActorService {
case CLUSTER_TELEMETRY_TS_UPDATE_MESSAGE:
actorContext.getTsSubService().onRemoteTsUpdate(serverAddress, msg.getPayload().toByteArray());
break;
+ case CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE:
+ actorContext.getDeviceRpcService().processRemoteResponseFromDevice(serverAddress, msg.getPayload().toByteArray());
+ break;
}
}
diff --git a/application/src/main/java/org/thingsboard/server/controller/RpcController.java b/application/src/main/java/org/thingsboard/server/controller/RpcController.java
index 1422a34..e39a3f6 100644
--- a/application/src/main/java/org/thingsboard/server/controller/RpcController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/RpcController.java
@@ -128,7 +128,7 @@ public class RpcController extends BaseController {
timeout,
body
);
- deviceRpcService.process(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse));
+ deviceRpcService.processRpcRequestToDevice(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse));
}
@Override
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
index bd0071a..6630d7c 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
@@ -15,6 +15,7 @@
*/
package org.thingsboard.server.service.rpc;
+import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -28,11 +29,15 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.dao.audit.AuditLogService;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
+import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
+import org.thingsboard.server.service.telemetry.sub.Subscription;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -57,14 +62,10 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
@Autowired
private ActorService actorService;
- @Autowired
- private AuditLogService auditLogService;
-
private ScheduledExecutorService rpcCallBackExecutor;
private final ConcurrentMap<UUID, Consumer<FromDeviceRpcResponse>> localRpcRequests = new ConcurrentHashMap<>();
-
@PostConstruct
public void initExecutor() {
rpcCallBackExecutor = Executors.newSingleThreadScheduledExecutor();
@@ -78,7 +79,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
}
@Override
- public void process(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) {
+ public void processRpcRequestToDevice(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) {
log.trace("[{}] Processing local rpc call for device [{}]", request.getTenantId(), request.getDeviceId());
sendRpcRequest(request);
UUID requestId = request.getId();
@@ -89,33 +90,48 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
log.error("[{}] timeout the request: [{}]", this.hashCode(), requestId);
Consumer<FromDeviceRpcResponse> consumer = localRpcRequests.remove(requestId);
if (consumer != null) {
- consumer.accept(new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT));
+ consumer.accept(new FromDeviceRpcResponse(requestId, null, null, RpcError.TIMEOUT));
}
}, timeout, TimeUnit.MILLISECONDS);
}
@Override
- public void process(ToDeviceRpcRequest request, ServerAddress originator) {
-// if (pluginServerAddress.isPresent()) {
-// systemContext.getRpcService().tell(pluginServerAddress.get(), responsePluginMsg);
-// logger.debug("[{}] Rpc command response sent to remote plugin actor [{}]!", deviceId, requestMd.getMsg().getMsg().getId());
-// } else {
-// context.parent().tell(responsePluginMsg, ActorRef.noSender());
-// logger.debug("[{}] Rpc command response sent to local plugin actor [{}]!", deviceId, requestMd.getMsg().getMsg().getId());
-// }
+ public void processRpcResponseFromDevice(FromDeviceRpcResponse response) {
+ log.error("[{}] response the request: [{}]", this.hashCode(), response.getId());
+ if (routingService.getCurrentServer().equals(response.getServerAddress())) {
+ UUID requestId = response.getId();
+ Consumer<FromDeviceRpcResponse> consumer = localRpcRequests.remove(requestId);
+ if (consumer != null) {
+ consumer.accept(response);
+ } else {
+ log.trace("[{}] Unknown or stale rpc response received [{}]", requestId, response);
+ }
+ } else {
+ ClusterAPIProtos.FromDeviceRPCResponseProto.Builder builder = ClusterAPIProtos.FromDeviceRPCResponseProto.newBuilder();
+ builder.setRequestIdMSB(response.getId().getMostSignificantBits());
+ builder.setRequestIdLSB(response.getId().getLeastSignificantBits());
+ response.getResponse().ifPresent(builder::setResponse);
+ if (response.getError().isPresent()) {
+ builder.setError(response.getError().get().ordinal());
+ } else {
+ builder.setError(-1);
+ }
+ rpcService.tell(response.getServerAddress(), ClusterAPIProtos.MessageType.CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE, builder.build().toByteArray());
+ }
}
@Override
- public void process(FromDeviceRpcResponse response) {
- log.error("[{}] response the request: [{}]", this.hashCode(), response.getId());
- //TODO: send to another server if needed.
- UUID requestId = response.getId();
- Consumer<FromDeviceRpcResponse> consumer = localRpcRequests.remove(requestId);
- if (consumer != null) {
- consumer.accept(response);
- } else {
- log.trace("[{}] Unknown or stale rpc response received [{}]", requestId, response);
+ public void processRemoteResponseFromDevice(ServerAddress serverAddress, byte[] data) {
+ ClusterAPIProtos.FromDeviceRPCResponseProto proto;
+ try {
+ proto = ClusterAPIProtos.FromDeviceRPCResponseProto.parseFrom(data);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
}
+ RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null;
+ FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB()), routingService.getCurrentServer(),
+ proto.getResponse(), error);
+ processRpcResponseFromDevice(response);
}
@Override
@@ -125,8 +141,8 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
}
private void sendRpcRequest(ToDeviceRpcRequest msg) {
+ ToDeviceRpcRequestActorMsg rpcMsg = new ToDeviceRpcRequestActorMsg(routingService.getCurrentServer(), msg);
log.trace("[{}] Forwarding msg {} to device actor!", msg.getDeviceId(), msg);
- ToDeviceRpcRequestActorMsg rpcMsg = new ToDeviceRpcRequestActorMsg(msg);
forward(msg.getDeviceId(), rpcMsg);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java
index f6e3543..8b4d47b 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java
@@ -27,12 +27,11 @@ import java.util.function.Consumer;
*/
public interface DeviceRpcService {
- void process(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer);
+ void processRpcRequestToDevice(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer);
- void process(ToDeviceRpcRequest request, ServerAddress originator);
-
- void process(FromDeviceRpcResponse response);
+ void processRpcResponseFromDevice(FromDeviceRpcResponse response);
void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body);
+ void processRemoteResponseFromDevice(ServerAddress serverAddress, byte[] bytes);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponse.java b/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponse.java
index 855aed4..9c3ce9a 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponse.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponse.java
@@ -19,6 +19,7 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.thingsboard.rule.engine.api.RpcError;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
import java.util.Optional;
import java.util.UUID;
@@ -31,6 +32,8 @@ import java.util.UUID;
public class FromDeviceRpcResponse {
@Getter
private final UUID id;
+ @Getter
+ private final ServerAddress serverAddress;
private final String response;
private final RpcError error;
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestActorMsg.java b/application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestActorMsg.java
index acdd9fc..37c4c00 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestActorMsg.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestActorMsg.java
@@ -34,18 +34,11 @@ import java.util.Optional;
@RequiredArgsConstructor
public class ToDeviceRpcRequestActorMsg implements ToDeviceActorNotificationMsg {
+ @Getter
private final ServerAddress serverAddress;
@Getter
private final ToDeviceRpcRequest msg;
- public ToDeviceRpcRequestActorMsg(ToDeviceRpcRequest msg) {
- this(null, msg);
- }
-
- public Optional<ServerAddress> getServerAddress() {
- return Optional.ofNullable(serverAddress);
- }
-
@Override
public DeviceId getDeviceId() {
return msg.getDeviceId();
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/ToServerRpcResponseActorMsg.java b/application/src/main/java/org/thingsboard/server/service/rpc/ToServerRpcResponseActorMsg.java
index 9875312..201f656 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/ToServerRpcResponseActorMsg.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/ToServerRpcResponseActorMsg.java
@@ -34,8 +34,6 @@ import java.util.Optional;
@RequiredArgsConstructor
public class ToServerRpcResponseActorMsg implements ToDeviceActorNotificationMsg {
- private final ServerAddress serverAddress;
-
@Getter
private final TenantId tenantId;
@@ -45,14 +43,6 @@ public class ToServerRpcResponseActorMsg implements ToDeviceActorNotificationMsg
@Getter
private final ToServerRpcResponseMsg msg;
- public ToServerRpcResponseActorMsg(TenantId tenantId, DeviceId deviceId, ToServerRpcResponseMsg msg) {
- this(null, tenantId, deviceId, msg);
- }
-
- public Optional<ServerAddress> getServerAddress() {
- return Optional.ofNullable(serverAddress);
- }
-
@Override
public MsgType getMsgType() {
return MsgType.SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG;
diff --git a/application/src/main/proto/cluster.proto b/application/src/main/proto/cluster.proto
index 90917e1..ac96010 100644
--- a/application/src/main/proto/cluster.proto
+++ b/application/src/main/proto/cluster.proto
@@ -56,6 +56,7 @@ enum MessageType {
CLUSTER_TELEMETRY_SESSION_CLOSE_MESSAGE = 9;
CLUSTER_TELEMETRY_ATTR_UPDATE_MESSAGE = 10;
CLUSTER_TELEMETRY_TS_UPDATE_MESSAGE = 11;
+ CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE = 12;
}
// Messages related to CLUSTER_TELEMETRY_MESSAGE
@@ -121,3 +122,9 @@ message KeyValueProto {
bool boolValue = 7;
}
+message FromDeviceRPCResponseProto {
+ int64 requestIdMSB = 1;
+ int64 requestIdLSB = 2;
+ string response = 3;
+ int32 error = 4;
+}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicActorSystemToDeviceSessionActorMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicActorSystemToDeviceSessionActorMsg.java
index cb931f4..cc29891 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicActorSystemToDeviceSessionActorMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicActorSystemToDeviceSessionActorMsg.java
@@ -42,7 +42,7 @@ public class BasicActorSystemToDeviceSessionActorMsg implements ActorSystemToDev
@Override
public String toString() {
- return "BasicToSessionResponseMsg [msg=" + msg + ", sessionId=" + sessionId + "]";
+ return "BasicActorSystemToDeviceSessionActorMsg [msg=" + msg + ", sessionId=" + sessionId + "]";
}
@Override