thingsboard-memoizeit
Changes
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 187(+115 -72)
application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java 41(+0 -41)
application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java 12(+5 -7)
common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java 36(+26 -10)
common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java 17(+13 -4)
common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java 6(+6 -0)
transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java 7(+4 -3)
transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java 2(+1 -1)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java 76(+48 -28)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java 26(+20 -6)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 7abcca2..c590104 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -32,7 +32,6 @@ import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.DeviceId;
-import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKey;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
@@ -44,18 +43,33 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.core.AttributesUpdateNotification;
import org.thingsboard.server.common.msg.core.RuleEngineError;
import org.thingsboard.server.common.msg.core.RuleEngineErrorMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
-import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.common.msg.session.SessionMsgType;
-import org.thingsboard.server.common.msg.session.SessionType;
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
import org.thingsboard.server.gen.transport.TransportProtos;
+import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto;
+import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType;
+import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionCloseNotificationProto;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
+import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TsKvListProto;
+import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
@@ -76,8 +90,6 @@ import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;
-import org.thingsboard.server.gen.transport.TransportProtos.*;
-
/**
* @author Andrew Shvayka
*/
@@ -123,11 +135,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
void processRpcRequest(ActorContext context, ToDeviceRpcRequestActorMsg msg) {
ToDeviceRpcRequest request = msg.getMsg();
ToDeviceRpcRequestBody body = request.getBody();
- ToDeviceRpcRequestMsg rpcRequest = new ToDeviceRpcRequestMsg(
- rpcSeq++,
- body.getMethod(),
- body.getParams()
- );
+ ToDeviceRpcRequestMsg rpcRequest = ToDeviceRpcRequestMsg.newBuilder().setRequestId(
+ rpcSeq++).setMethodName(body.getMethod()).setParams(body.getParams()).build();
long timeout = request.getExpirationTime() - System.currentTimeMillis();
if (timeout <= 0) {
@@ -136,13 +145,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
boolean sent = rpcSubscriptions.size() > 0;
- Set<SessionId> syncSessionSet = new HashSet<>();
+ Set<UUID> syncSessionSet = new HashSet<>();
rpcSubscriptions.entrySet().forEach(sub -> {
-// ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sub.getKey());
-// sendMsgToSessionActor(response, sub.getValue().getServer());
-// if (SessionType.SYNC == sub.getValue().getType()) {
-// syncSessionSet.add(sub.getKey());
-// }
+ sendToTransport(rpcRequest, sub.getKey(), sub.getValue().getNodeId());
+ if (TransportProtos.SessionType.SYNC == sub.getValue().getType()) {
+ syncSessionSet.add(sub.getKey());
+ }
});
syncSessionSet.forEach(rpcSubscriptions::remove);
@@ -175,10 +183,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
- private void sendPendingRequests(ActorContext context, SessionId sessionId, SessionType type, Optional<ServerAddress> server) {
+ private void sendPendingRequests(ActorContext context, UUID sessionId, SessionInfoProto sessionInfo) {
+ TransportProtos.SessionType sessionType = getSessionType(sessionId);
if (!toDeviceRpcPendingMap.isEmpty()) {
logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId);
- if (type == SessionType.SYNC) {
+ if (sessionType == TransportProtos.SessionType.SYNC) {
logger.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId);
rpcSubscriptions.remove(sessionId);
}
@@ -186,16 +195,16 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
logger.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId);
}
Set<Integer> sentOneWayIds = new HashSet<>();
- if (type == SessionType.ASYNC) {
- toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, server, sentOneWayIds));
+ if (sessionType == TransportProtos.SessionType.ASYNC) {
+ toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, sessionInfo.getNodeId(), sentOneWayIds));
} else {
- toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, server, sentOneWayIds));
+ toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, sessionInfo.getNodeId(), sentOneWayIds));
}
sentOneWayIds.forEach(toDeviceRpcPendingMap::remove);
}
- private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(ActorContext context, SessionId sessionId, Optional<ServerAddress> server, Set<Integer> sentOneWayIds) {
+ private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(ActorContext context, UUID sessionId, String nodeId, Set<Integer> sentOneWayIds) {
return entry -> {
ToDeviceRpcRequestActorMsg requestActorMsg = entry.getValue().getMsg();
ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg();
@@ -204,19 +213,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
sentOneWayIds.add(entry.getKey());
systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(request.getId(), requestActorMsg.getServerAddress(), null, null));
}
- ToDeviceRpcRequestMsg rpcRequest = new ToDeviceRpcRequestMsg(
- entry.getKey(),
- body.getMethod(),
- body.getParams()
- );
-// ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sessionId);
-// sendMsgToSessionActor(response, server);
+ ToDeviceRpcRequestMsg rpcRequest = ToDeviceRpcRequestMsg.newBuilder().setRequestId(
+ entry.getKey()).setMethodName(body.getMethod()).setParams(body.getParams()).build();
+ sendToTransport(rpcRequest, sessionId, nodeId);
};
}
void process(ActorContext context, TransportToDeviceActorMsgWrapper wrapper) {
TransportToDeviceActorMsg msg = wrapper.getMsg();
-// processRpcResponses(context, msg);
if (msg.hasSessionEvent()) {
processSessionStateMsgs(msg.getSessionInfo(), msg.getSessionEvent());
}
@@ -237,15 +241,13 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (msg.hasGetAttributes()) {
handleGetAttributesRequest(context, msg.getSessionInfo(), msg.getGetAttributes());
}
-// SessionMsgType sessionMsgType = msg.getPayload().getMsgType();
-// if (sessionMsgType.requiresRulesProcessing()) {
-// switch (sessionMsgType) {
-// case TO_SERVER_RPC_REQUEST:
-// handleClientSideRPCRequest(context, msg);
-// reportActivity();
-// break;
-// }
-// }
+ if (msg.hasToDeviceRPCCallResponse()) {
+ processRpcResponses(context, msg.getSessionInfo(), msg.getToDeviceRPCCallResponse());
+ }
+ if (msg.hasToServerRPCCallRequest()) {
+ handleClientSideRPCRequest(context, msg.getSessionInfo(), msg.getToServerRPCCallRequest());
+ reportActivity();
+ }
}
private void reportActivity() {
@@ -314,36 +316,42 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
-// private void handleClientSideRPCRequest(ActorContext context, DeviceToDeviceActorMsg src) {
-// ToServerRpcRequestMsg request = (ToServerRpcRequestMsg) src.getPayload();
-//
-// JsonObject json = new JsonObject();
-// json.addProperty("method", request.getMethod());
-// json.add("params", jsonParser.parse(request.getParams()));
-//
-// TbMsgMetaData requestMetaData = defaultMetaData.copy();
-// requestMetaData.putValue("requestId", Integer.toString(request.getRequestId()));
-// TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
-// PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), SessionMsgType.TO_SERVER_RPC_REQUEST, request.getRequestId(), false, 1);
-// pushToRuleEngineWithTimeout(context, tbMsg, msgData);
-//
-// scheduleMsgWithDelay(context, new DeviceActorClientSideRpcTimeoutMsg(request.getRequestId(), systemContext.getClientSideRpcTimeout()), systemContext.getClientSideRpcTimeout());
-// toServerRpcPendingMap.put(request.getRequestId(), new ToServerRpcRequestMetadata(src.getSessionId(), src.getSessionType(), src.getServerAddress()));
-// }
+ private void handleClientSideRPCRequest(ActorContext context, SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg request) {
+ UUID sessionId = getSessionId(sessionInfo);
+ JsonObject json = new JsonObject();
+ json.addProperty("method", request.getMethodName());
+ json.add("params", jsonParser.parse(request.getParams()));
+
+ TbMsgMetaData requestMetaData = defaultMetaData.copy();
+ requestMetaData.putValue("requestId", Integer.toString(request.getRequestId()));
+ TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
+ context.parent().tell(new DeviceActorToRuleEngineMsg(context.self(), tbMsg), context.self());
+
+ scheduleMsgWithDelay(context, new DeviceActorClientSideRpcTimeoutMsg(request.getRequestId(), systemContext.getClientSideRpcTimeout()), systemContext.getClientSideRpcTimeout());
+ toServerRpcPendingMap.put(request.getRequestId(), new ToServerRpcRequestMetadata(sessionId, getSessionType(sessionId), sessionInfo.getNodeId()));
+ }
- public void processClientSideRpcTimeout(ActorContext context, DeviceActorClientSideRpcTimeoutMsg msg) {
+ private TransportProtos.SessionType getSessionType(UUID sessionId) {
+ return sessions.containsKey(sessionId) ? TransportProtos.SessionType.ASYNC : TransportProtos.SessionType.SYNC;
+ }
+
+ void processClientSideRpcTimeout(ActorContext context, DeviceActorClientSideRpcTimeoutMsg msg) {
ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getId());
if (data != null) {
logger.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId());
- ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(SessionMsgType.TO_SERVER_RPC_REQUEST, RuleEngineError.TIMEOUT);
-// sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer());
+ sendToTransport(TransportProtos.ToServerRpcResponseMsg.newBuilder()
+ .setRequestId(msg.getId()).setError("timeout").build()
+ , data.getSessionId(), data.getNodeId());
}
}
void processToServerRPCResponse(ActorContext context, ToServerRpcResponseActorMsg msg) {
- ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getMsg().getRequestId());
+ int requestId = msg.getMsg().getRequestId();
+ ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(requestId);
if (data != null) {
-// sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer());
+ sendToTransport(TransportProtos.ToServerRpcResponseMsg.newBuilder()
+ .setRequestId(requestId).setPayload(msg.getMsg().getData()).build()
+ , data.getSessionId(), data.getNodeId());
}
}
@@ -382,7 +390,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (hasNotificationData) {
AttributeUpdateNotificationMsg finalNotification = notification.build();
attributeSubscriptions.entrySet().forEach(sub -> {
- sendToTransport(finalNotification, sub.getKey(), sub.getValue());
+ sendToTransport(finalNotification, sub.getKey(), sub.getValue().getNodeId());
});
}
} else {
@@ -390,6 +398,19 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
+ private void processRpcResponses(ActorContext context, SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg responseMsg) {
+ UUID sessionId = getSessionId(sessionInfo);
+ logger.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId);
+ ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
+ boolean success = requestMd != null;
+ if (success) {
+ systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
+ requestMd.getMsg().getServerAddress(), responseMsg.getPayload(), null));
+ } else {
+ logger.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
+ }
+ }
+
// private void processRpcResponses(ActorContext context, DeviceToDeviceActorMsg msg) {
// SessionId sessionId = msg.getSessionId();
// FromDeviceMsg inMsg = msg.getPayload();
@@ -424,7 +445,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) {
- UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
+ UUID sessionId = getSessionId(sessionInfo);
if (subscribeCmd.getUnsubscribe()) {
logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
attributeSubscriptions.remove(sessionId);
@@ -438,8 +459,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
+ private UUID getSessionId(SessionInfoProto sessionInfo) {
+ return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
+ }
+
private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToRPCMsg subscribeCmd) {
- UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
+ UUID sessionId = getSessionId(sessionInfo);
if (subscribeCmd.getUnsubscribe()) {
logger.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId);
rpcSubscriptions.remove(sessionId);
@@ -450,11 +475,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
logger.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
rpcSubscriptions.put(sessionId, session);
+ sendPendingRequests(context, sessionId, sessionInfo);
}
}
private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {
- UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
+ UUID sessionId = getSessionId(sessionInfo);
if (msg.getEvent() == SessionEvent.OPEN) {
logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) {
@@ -548,14 +574,31 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg);
}
- private void sendToTransport(AttributeUpdateNotificationMsg notificationMsg, UUID sessionId, SessionInfo sessionInfo) {
+ private void sendToTransport(AttributeUpdateNotificationMsg notificationMsg, UUID sessionId, String nodeId) {
DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setAttributeUpdateNotification(notificationMsg).build();
- systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg);
+ systemContext.getRuleEngineTransportService().process(nodeId, msg);
}
+ private void sendToTransport(ToDeviceRpcRequestMsg rpcMsg, UUID sessionId, String nodeId) {
+ DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
+ .setSessionIdMSB(sessionId.getMostSignificantBits())
+ .setSessionIdLSB(sessionId.getLeastSignificantBits())
+ .setToDeviceRequest(rpcMsg).build();
+ systemContext.getRuleEngineTransportService().process(nodeId, msg);
+ }
+
+ private void sendToTransport(TransportProtos.ToServerRpcResponseMsg rpcMsg, UUID sessionId, String nodeId) {
+ DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
+ .setSessionIdMSB(sessionId.getMostSignificantBits())
+ .setSessionIdLSB(sessionId.getLeastSignificantBits())
+ .setToServerResponse(rpcMsg).build();
+ systemContext.getRuleEngineTransportService().process(nodeId, msg);
+ }
+
+
private List<TsKvProto> toTsKvProtos(@Nullable List<AttributeKvEntry> result) {
List<TsKvProto> clientAttributes;
if (result == null || result.isEmpty()) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java b/application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java
index f82a8c2..669d94b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java
@@ -16,18 +16,16 @@
package org.thingsboard.server.actors.device;
import lombok.Data;
-import org.thingsboard.server.common.data.id.SessionId;
-import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.session.SessionType;
+import org.thingsboard.server.gen.transport.TransportProtos;
-import java.util.Optional;
+import java.util.UUID;
/**
* @author Andrew Shvayka
*/
@Data
public class ToServerRpcRequestMetadata {
- private final SessionId sessionId;
- private final SessionType type;
- private final Optional<ServerAddress> server;
+ private final UUID sessionId;
+ private final TransportProtos.SessionType type;
+ private final String nodeId;
}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
index 872dfaa..db7ee88 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,7 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import com.google.gson.JsonSyntaxException;
+import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.kv.AttributeKey;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
@@ -95,6 +96,15 @@ public class JsonConverter {
}
}
+ public static JsonElement toJson(TransportProtos.ToDeviceRpcRequestMsg msg, boolean includeRequestId) {
+ JsonObject result = new JsonObject();
+ if (includeRequestId) {
+ result.addProperty("id", msg.getRequestId());
+ }
+ result.addProperty("method", msg.getMethodName());
+ result.add("params", new JsonParser().parse(msg.getParams()));
+ return result;
+ }
private static void parseObject(PostTelemetryMsg.Builder builder, long systemTs, JsonElement jsonObject) {
JsonObject jo = jsonObject.getAsJsonObject();
@@ -112,14 +122,14 @@ public class JsonConverter {
request.addTsKvList(builder.build());
}
- public static void parseWithTs(PostTelemetryMsg.Builder request, JsonObject jo) {
+ private static void parseWithTs(PostTelemetryMsg.Builder request, JsonObject jo) {
TsKvListProto.Builder builder = TsKvListProto.newBuilder();
builder.setTs(jo.get("ts").getAsLong());
builder.addAllKv(parseProtoValues(jo.get("values").getAsJsonObject()));
request.addTsKvList(builder.build());
}
- public static List<KeyValueProto> parseProtoValues(JsonObject valuesObject) {
+ private static List<KeyValueProto> parseProtoValues(JsonObject valuesObject) {
List<KeyValueProto> result = new ArrayList<>();
for (Entry<String, JsonElement> valueEntry : valuesObject.entrySet()) {
JsonElement element = valueEntry.getValue();
@@ -172,9 +182,9 @@ public class JsonConverter {
return request;
}
- public static ToServerRpcRequestMsg convertToServerRpcRequest(JsonElement json, int requestId) throws JsonSyntaxException {
+ public static TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(JsonElement json, int requestId) throws JsonSyntaxException {
JsonObject object = json.getAsJsonObject();
- return new ToServerRpcRequestMsg(requestId, object.get("method").getAsString(), GSON.toJson(object.get("params")));
+ return TransportProtos.ToServerRpcRequestMsg.newBuilder().setRequestId(requestId).setMethodName(object.get("method").getAsString()).setParams(GSON.toJson(object.get("params"))).build();
}
private static void parseObject(BasicTelemetryUploadRequest request, long systemTs, JsonElement jsonObject) {
@@ -368,8 +378,14 @@ public class JsonConverter {
return result;
}
- public static JsonElement toJson(ToServerRpcResponseMsg msg) {
- return new JsonParser().parse(msg.getData());
+ public static JsonElement toJson(TransportProtos.ToServerRpcResponseMsg msg) {
+ if (StringUtils.isEmpty(msg.getError())) {
+ return new JsonParser().parse(msg.getPayload());
+ } else {
+ JsonObject errorMsg = new JsonObject();
+ errorMsg.addProperty("error", msg.getError());
+ return errorMsg;
+ }
}
public static JsonElement toErrorJson(String errorMsg) {
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
index 17d4a1f..09d1bcb 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,8 +15,11 @@
*/
package org.thingsboard.server.common.transport;
+import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionCloseNotificationProto;
+import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
/**
* Created by ashvayka on 04.10.18.
@@ -26,4 +29,10 @@ public interface SessionMsgListener {
void onGetAttributesResponse(GetAttributeResponseMsg getAttributesResponse);
void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification);
+
+ void onRemoteSessionCloseCommand(SessionCloseNotificationProto sessionCloseNotification);
+
+ void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest);
+
+ void onToServerRpcResponse(ToServerRpcResponseMsg toServerResponse);
}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index bf33b1d..cd30419 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -15,6 +15,8 @@
*/
package org.thingsboard.server.common.transport;
+import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
@@ -49,6 +51,10 @@ public interface TransportService {
void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback);
+ void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback);
+
+ void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
+
void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
void deregisterSession(SessionInfoProto sessionInfo);
diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto
index 66c0d81..bcf5f52 100644
--- a/common/transport/src/main/proto/transport.proto
+++ b/common/transport/src/main/proto/transport.proto
@@ -137,6 +137,29 @@ message SubscribeToRPCMsg {
bool unsubscribe = 1;
}
+message ToDeviceRpcRequestMsg {
+ int32 requestId = 1;
+ string methodName = 2;
+ string params = 3;
+}
+
+message ToDeviceRpcResponseMsg {
+ int32 requestId = 1;
+ string payload = 2;
+}
+
+message ToServerRpcRequestMsg {
+ int32 requestId = 1;
+ string methodName = 2;
+ string params = 3;
+}
+
+message ToServerRpcResponseMsg {
+ int32 requestId = 1;
+ string payload = 2;
+ string error = 3;
+}
+
message TransportToDeviceActorMsg {
SessionInfoProto sessionInfo = 1;
SessionEventMsg sessionEvent = 2;
@@ -144,7 +167,9 @@ message TransportToDeviceActorMsg {
PostAttributeMsg postAttributes = 4;
GetAttributeRequestMsg getAttributes = 5;
SubscribeToAttributeUpdatesMsg subscribeToAttributes = 6;
- SubscribeToRPCMsg subscribeToRPC= 7;
+ SubscribeToRPCMsg subscribeToRPC = 7;
+ ToDeviceRpcResponseMsg toDeviceRPCCallResponse = 8;
+ ToServerRpcRequestMsg toServerRPCCallRequest = 9;
}
message DeviceActorToTransportMsg {
@@ -153,6 +178,8 @@ message DeviceActorToTransportMsg {
SessionCloseNotificationProto sessionCloseNotification = 3;
GetAttributeResponseMsg getAttributesResponse = 4;
AttributeUpdateNotificationMsg attributeUpdateNotification = 5;
+ ToDeviceRpcRequestMsg toDeviceRequest = 6;
+ ToServerRpcResponseMsg toServerResponse = 7;
}
/**
diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java
index 1c96311..3cd4142 100644
--- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java
+++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java
@@ -98,7 +98,8 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
String payload = validatePayload(ctx, inbound);
- return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), 0);
+// return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), 0);
+ return null;
}
@Override
@@ -225,8 +226,8 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
private Response convertToServerRpcResponse(SessionContext ctx, ToServerRpcResponseMsg msg) {
if (msg.isSuccess()) {
Response response = new Response(ResponseCode.CONTENT);
- JsonElement result = JsonConverter.toJson(msg);
- response.setPayload(result.toString());
+// JsonElement result = JsonConverter.toJson(msg);
+// response.setPayload(result.toString());
return response;
} else {
return convertError(Optional.of(new RuntimeException("Server RPC response is empty!")));
diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java
index e503409..5269285 100644
--- a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java
+++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java
@@ -114,7 +114,7 @@ public class HttpSessionCtx extends DeviceAwareSessionContext {
}
private void reply(ToServerRpcResponseMsg msg) {
- responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK));
+// responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK));
}
private void reply(AttributesUpdateNotification msg) {
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
index d4ab033..a2be169 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,7 +17,6 @@ package org.thingsboard.server.transport.mqtt.adaptors;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import io.netty.buffer.ByteBuf;
@@ -99,6 +98,31 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
}
@Override
+ public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
+ String topicName = inbound.variableHeader().topicName();
+ try {
+ Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC.length()));
+ String payload = inbound.payload().toString(UTF8);
+ return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(payload).build();
+ } catch (RuntimeException e) {
+ log.warn("Failed to decode get attributes request", e);
+ throw new AdaptorException(e);
+ }
+ }
+
+ @Override
+ public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
+ String topicName = inbound.variableHeader().topicName();
+ String payload = validatePayload(ctx.getSessionId(), inbound.payload());
+ try {
+ Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC.length()));
+ return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), requestId);
+ } catch (IllegalStateException | JsonSyntaxException ex) {
+ throw new AdaptorException(ex);
+ }
+ }
+
+ @Override
public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
if (!StringUtils.isEmpty(responseMsg.getError())) {
throw new AdaptorException(responseMsg.getError());
@@ -115,9 +139,17 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
@Override
public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException {
- return Optional.of(createMqttPublishMsg(ctx,
- MqttTopics.DEVICE_ATTRIBUTES_TOPIC,
- JsonConverter.toJson(notificationMsg)));
+ return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, JsonConverter.toJson(notificationMsg)));
+ }
+
+ @Override
+ public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException {
+ return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(), JsonConverter.toJson(rpcRequest, false)));
+ }
+
+ @Override
+ public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse) {
+ return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(), JsonConverter.toJson(rpcResponse)));
}
@Override
@@ -149,7 +181,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
msg = convertToRpcCommandResponse(ctx, (MqttPublishMessage) inbound);
break;
case TO_SERVER_RPC_REQUEST:
- msg = convertToServerRpcRequest(ctx, (MqttPublishMessage) inbound);
+ msg = null;//convertToServerRpcRequest(ctx, (MqttPublishMessage) inbound);
break;
default:
log.warn("[{}] Unsupported msg type: {}!", ctx.getSessionId(), type);
@@ -181,13 +213,12 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
break;
case TO_DEVICE_RPC_REQUEST:
ToDeviceRpcRequestMsg rpcRequest = (ToDeviceRpcRequestMsg) msg;
- result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(),
- rpcRequest);
+ result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(), rpcRequest);
break;
case TO_SERVER_RPC_RESPONSE:
- ToServerRpcResponseMsg rpcResponse = (ToServerRpcResponseMsg) msg;
- result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(),
- rpcResponse);
+// ToServerRpcResponseMsg rpcResponse = (ToServerRpcResponseMsg) msg;
+// result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(),
+// rpcResponse);
break;
case RULE_ENGINE_ERROR:
RuleEngineErrorMsg errorMsg = (RuleEngineErrorMsg) msg;
@@ -232,7 +263,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg, false));
}
- private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, ToServerRpcResponseMsg msg) {
+ private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, TransportProtos.ToServerRpcResponseMsg msg) {
return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg));
}
@@ -290,7 +321,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
private AttributesUpdateRequest convertToUpdateAttributesRequest(SessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
String payload = validatePayload(ctx.getSessionId(), inbound.payload());
try {
- return JsonConverter.convertToAttributes(new JsonParser().parse(payload), inbound.variableHeader().messageId());
+ return JsonConverter.convertToAttributes(new JsonParser().parse(payload), inbound.variableHeader().packetId());
} catch (IllegalStateException | JsonSyntaxException ex) {
throw new AdaptorException(ex);
}
@@ -299,18 +330,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
private TelemetryUploadRequest convertToTelemetryUploadRequest(SessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
String payload = validatePayload(ctx.getSessionId(), inbound.payload());
try {
- return JsonConverter.convertToTelemetry(new JsonParser().parse(payload), inbound.variableHeader().messageId());
- } catch (IllegalStateException | JsonSyntaxException ex) {
- throw new AdaptorException(ex);
- }
- }
-
- private FromDeviceMsg convertToServerRpcRequest(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
- String topicName = inbound.variableHeader().topicName();
- String payload = validatePayload(ctx.getSessionId(), inbound.payload());
- try {
- Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC.length()));
- return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), requestId);
+ return JsonConverter.convertToTelemetry(new JsonParser().parse(payload), inbound.variableHeader().packetId());
} catch (IllegalStateException | JsonSyntaxException ex) {
throw new AdaptorException(ex);
}
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
index 602fde1..48173f9 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
@@ -19,7 +19,15 @@ import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.thingsboard.server.common.transport.TransportAdaptor;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
-import org.thingsboard.server.gen.transport.TransportProtos;
+import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
import java.util.Optional;
@@ -29,15 +37,21 @@ import java.util.Optional;
*/
public interface MqttTransportAdaptor extends TransportAdaptor<DeviceSessionCtx, MqttMessage, MqttMessage> {
- TransportProtos.PostTelemetryMsg convertToPostTelemetry(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
+ PostTelemetryMsg convertToPostTelemetry(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
- TransportProtos.PostAttributeMsg convertToPostAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
+ PostAttributeMsg convertToPostAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
- TransportProtos.GetAttributeRequestMsg convertToGetAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
+ GetAttributeRequestMsg convertToGetAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
- Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException;
+ ToDeviceRpcResponseMsg convertToDeviceRpcResponse(DeviceSessionCtx ctx, MqttPublishMessage mqttMsg) throws AdaptorException;
- Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException;
+ ToServerRpcRequestMsg convertToServerRpcRequest(DeviceSessionCtx ctx, MqttPublishMessage mqttMsg) throws AdaptorException;
+ Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, GetAttributeResponseMsg responseMsg) throws AdaptorException;
+ Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException;
+
+ Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException;
+
+ Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, ToServerRpcResponseMsg rpcResponse);
}
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index b1ee037..3099b24 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -223,43 +223,18 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
TransportProtos.GetAttributeRequestMsg getAttributeMsg = adaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg);
transportService.process(sessionInfo, getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
+ } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC)){
+ TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = adaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg);
+ transportService.process(sessionInfo, rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
+ } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)){
+ TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = adaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg);
+ transportService.process(sessionInfo, rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
}
} catch (AdaptorException e) {
log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
ctx.close();
}
-// AdaptorToSessionActorMsg msg = null;
-// try {
-// if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) {
-// msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_TELEMETRY_REQUEST, mqttMsg);
-// } else if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
-// msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_ATTRIBUTES_REQUEST, mqttMsg);
-// } else if (topicName.startsWith(DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
-// msg = adaptor.convertToActorMsg(deviceSessionCtx, GET_ATTRIBUTES_REQUEST, mqttMsg);
-// if (msgId >= 0) {
-// ctx.writeAndFlush(createMqttPubAckMsg(msgId));
-// }
-// } else if (topicName.startsWith(DEVICE_RPC_RESPONSE_TOPIC)) {
-// msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_DEVICE_RPC_RESPONSE, mqttMsg);
-// if (msgId >= 0) {
-// ctx.writeAndFlush(createMqttPubAckMsg(msgId));
-// }
-// } else if (topicName.startsWith(DEVICE_RPC_REQUESTS_TOPIC)) {
-// msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_SERVER_RPC_REQUEST, mqttMsg);
-// if (msgId >= 0) {
-// ctx.writeAndFlush(createMqttPubAckMsg(msgId));
-// }
-// }
-// } catch (AdaptorException e) {
-// log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
-// }
-// if (msg != null) {
-// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
-// } else {
-// log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
-// ctx.close();
-// }
}
private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final int msgId, final T msg) {
@@ -555,4 +530,30 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
log.trace("[{}] Failed to convert device attributes update to MQTT msg", sessionId, e);
}
}
+
+ @Override
+ public void onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
+ log.trace("[{}] Received the remote command to close the session", sessionId);
+ processDisconnect(deviceSessionCtx.getChannel());
+ }
+
+ @Override
+ public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
+ log.trace("[{}] Received RPC command to device", sessionId);
+ try {
+ adaptor.convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
+ } catch (Exception e) {
+ log.trace("[{}] Failed to convert device RPC commandto MQTT msg", sessionId, e);
+ }
+ }
+
+ @Override
+ public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg rpcResponse) {
+ log.trace("[{}] Received RPC command to device", sessionId);
+ try {
+ adaptor.convertToPublish(deviceSessionCtx, rpcResponse).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
+ } catch (Exception e) {
+ log.trace("[{}] Failed to convert device RPC commandto MQTT msg", sessionId, e);
+ }
+ }
}
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
index 648b01c..1539238 100644
--- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -164,6 +164,15 @@ public class MqttTransportService implements TransportService {
if (toSessionMsg.hasAttributeUpdateNotification()) {
listener.onAttributeUpdate(toSessionMsg.getAttributeUpdateNotification());
}
+ if (toSessionMsg.hasSessionCloseNotification()) {
+ listener.onRemoteSessionCloseCommand(toSessionMsg.getSessionCloseNotification());
+ }
+ if (toSessionMsg.hasToDeviceRequest()) {
+ listener.onToDeviceRpcRequest(toSessionMsg.getToDeviceRequest());
+ }
+ if (toSessionMsg.hasToServerResponse()) {
+ listener.onToServerRpcResponse(toSessionMsg.getToServerResponse());
+ }
});
} else {
//TODO: should we notify the device actor about missed session?
@@ -273,6 +282,24 @@ public class MqttTransportService implements TransportService {
}
@Override
+ public void process(SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setToDeviceRPCCallResponse(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
+ }
+
+ @Override
+ public void process(SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setToServerRPCCallRequest(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
+ }
+
+ @Override
public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {
sessions.putIfAbsent(toId(sessionInfo), listener);
//TODO: monitor sessions periodically: PING REQ/RESP, etc.