thingsboard-developers
Changes
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 113(+70 -43)
common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java 37(+19 -18)
common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java 11(+7 -4)
common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java 6(+6 -0)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java 15(+11 -4)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java 4(+4 -0)
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java 91(+46 -45)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 0822766..7abcca2 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -216,12 +216,16 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
void process(ActorContext context, TransportToDeviceActorMsgWrapper wrapper) {
TransportToDeviceActorMsg msg = wrapper.getMsg();
-// processSubscriptionCommands(context, msg);
// processRpcResponses(context, msg);
if (msg.hasSessionEvent()) {
processSessionStateMsgs(msg.getSessionInfo(), msg.getSessionEvent());
}
-
+ if (msg.hasSubscribeToAttributes()) {
+ processSubscriptionCommands(context, msg.getSessionInfo(), msg.getSubscribeToAttributes());
+ }
+ if (msg.hasSubscribeToRPC()) {
+ processSubscriptionCommands(context, msg.getSessionInfo(), msg.getSubscribeToRPC());
+ }
if (msg.hasPostAttributes()) {
handlePostAttributesRequest(context, msg.getSessionInfo(), msg.getPostAttributes());
reportActivity();
@@ -236,9 +240,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
// SessionMsgType sessionMsgType = msg.getPayload().getMsgType();
// if (sessionMsgType.requiresRulesProcessing()) {
// switch (sessionMsgType) {
-// case GET_ATTRIBUTES_REQUEST:
-// handleGetAttributesRequest(msg);
-// break;
// case TO_SERVER_RPC_REQUEST:
// handleClientSideRPCRequest(context, msg);
// reportActivity();
@@ -262,7 +263,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
private void handleGetAttributesRequest(ActorContext context, SessionInfoProto sessionInfo, GetAttributeRequestMsg request) {
ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, toOptionalSet(request.getClientAttributeNamesList()));
ListenableFuture<List<AttributeKvEntry>> sharedAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.SHARED_SCOPE, toOptionalSet(request.getSharedAttributeNamesList()));
- UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
int requestId = request.getRequestId();
Futures.addCallback(Futures.allAsList(Arrays.asList(clientAttributesFuture, sharedAttributesFuture)), new FutureCallback<List<List<AttributeKvEntry>>>() {
@Override
@@ -272,7 +272,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
.addAllClientAttributeList(toTsKvProtos(result.get(0)))
.addAllSharedAttributeList(toTsKvProtos(result.get(1)))
.build();
- sendToTransport(responseMsg, sessionId, sessionInfo);
+ sendToTransport(responseMsg, sessionInfo);
}
@Override
@@ -280,7 +280,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder()
.setError(t.getMessage())
.build();
- sendToTransport(responseMsg, sessionId, sessionInfo);
+ sendToTransport(responseMsg, sessionInfo);
}
});
}
@@ -353,28 +353,37 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
void processAttributesUpdate(ActorContext context, DeviceAttributesEventNotificationMsg msg) {
if (attributeSubscriptions.size() > 0) {
- ToDeviceMsg notification = null;
+ boolean hasNotificationData = false;
+ AttributeUpdateNotificationMsg.Builder notification = AttributeUpdateNotificationMsg.newBuilder();
if (msg.isDeleted()) {
- List<AttributeKey> sharedKeys = msg.getDeletedKeys().stream()
+ List<String> sharedKeys = msg.getDeletedKeys().stream()
.filter(key -> DataConstants.SHARED_SCOPE.equals(key.getScope()))
+ .map(AttributeKey::getAttributeKey)
.collect(Collectors.toList());
- notification = new AttributesUpdateNotification(BasicAttributeKVMsg.fromDeleted(sharedKeys));
+ if (!sharedKeys.isEmpty()) {
+ notification.addAllSharedDeleted(sharedKeys);
+ hasNotificationData = true;
+ }
} else {
if (DataConstants.SHARED_SCOPE.equals(msg.getScope())) {
List<AttributeKvEntry> attributes = new ArrayList<>(msg.getValues());
if (attributes.size() > 0) {
- notification = new AttributesUpdateNotification(BasicAttributeKVMsg.fromShared(attributes));
+ List<TsKvProto> sharedUpdated = msg.getValues().stream().map(this::toTsKvProto)
+ .collect(Collectors.toList());
+ if (!sharedUpdated.isEmpty()) {
+ notification.addAllSharedUpdated(sharedUpdated);
+ hasNotificationData = true;
+ }
} else {
logger.debug("[{}] No public server side attributes changed!", deviceId);
}
}
}
- if (notification != null) {
- ToDeviceMsg finalNotification = notification;
-// attributeSubscriptions.entrySet().forEach(sub -> {
-// ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(finalNotification, sub.getKey());
-// sendMsgToSessionActor(response, sub.getValue().getServer());
-// });
+ if (hasNotificationData) {
+ AttributeUpdateNotificationMsg finalNotification = notification.build();
+ attributeSubscriptions.entrySet().forEach(sub -> {
+ sendToTransport(finalNotification, sub.getKey(), sub.getValue());
+ });
}
} else {
logger.debug("[{}] No registered attributes subscriptions to process!", deviceId);
@@ -414,25 +423,35 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
// }
}
-// private void processSubscriptionCommands(ActorContext context, DeviceToDeviceActorMsg msg) {
-// SessionId sessionId = msg.getSessionId();
-// SessionType sessionType = msg.getSessionType();
-// FromDeviceMsg inMsg = msg.getPayload();
-// if (inMsg.getMsgType() == SessionMsgType.SUBSCRIBE_ATTRIBUTES_REQUEST) {
-// logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
-// attributeSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress()));
-// } else if (inMsg.getMsgType() == SessionMsgType.UNSUBSCRIBE_ATTRIBUTES_REQUEST) {
-// logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
-// attributeSubscriptions.remove(sessionId);
-// } else if (inMsg.getMsgType() == SessionMsgType.SUBSCRIBE_RPC_COMMANDS_REQUEST) {
-// logger.debug("[{}] Registering rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType);
-// rpcSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress()));
-// sendPendingRequests(context, sessionId, sessionType, msg.getServerAddress());
-// } else if (inMsg.getMsgType() == SessionMsgType.UNSUBSCRIBE_RPC_COMMANDS_REQUEST) {
-// logger.debug("[{}] Canceling rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType);
-// rpcSubscriptions.remove(sessionId);
-// }
-// }
+ private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) {
+ UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
+ if (subscribeCmd.getUnsubscribe()) {
+ logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
+ attributeSubscriptions.remove(sessionId);
+ } else {
+ SessionInfo session = sessions.get(sessionId);
+ if (session == null) {
+ session = new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId());
+ }
+ logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
+ attributeSubscriptions.put(sessionId, session);
+ }
+ }
+
+ private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToRPCMsg subscribeCmd) {
+ UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
+ if (subscribeCmd.getUnsubscribe()) {
+ logger.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId);
+ rpcSubscriptions.remove(sessionId);
+ } else {
+ SessionInfo session = sessions.get(sessionId);
+ if (session == null) {
+ session = new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId());
+ }
+ logger.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
+ rpcSubscriptions.put(sessionId, session);
+ }
+ }
private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {
UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
@@ -521,11 +540,19 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
- private void sendToTransport(GetAttributeResponseMsg responseMsg, UUID sessionId, SessionInfoProto sessionInfo) {
+ private void sendToTransport(GetAttributeResponseMsg responseMsg, SessionInfoProto sessionInfo) {
+ DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
+ .setSessionIdMSB(sessionInfo.getSessionIdMSB())
+ .setSessionIdLSB(sessionInfo.getSessionIdLSB())
+ .setGetAttributesResponse(responseMsg).build();
+ systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg);
+ }
+
+ private void sendToTransport(AttributeUpdateNotificationMsg notificationMsg, UUID sessionId, SessionInfo sessionInfo) {
DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
- .setGetAttributesResponse(responseMsg).build();
+ .setAttributeUpdateNotification(notificationMsg).build();
systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg);
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
index 32cb60d..43ae592 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
@@ -25,5 +25,4 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
public class SessionInfo {
private final SessionType type;
private final String nodeId;
-
}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
index 6bb8eff..872dfaa 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -39,6 +39,7 @@ import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;
import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
+import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.*;
import java.util.ArrayList;
@@ -153,20 +154,6 @@ public class JsonConverter {
return result;
}
- private static void parseNumericProto(List<KvEntry> result, Entry<String, JsonElement> valueEntry, JsonPrimitive value) {
- if (value.getAsString().contains(".")) {
- result.add(new DoubleDataEntry(valueEntry.getKey(), value.getAsDouble()));
- } else {
- try {
- long longValue = Long.parseLong(value.getAsString());
- result.add(new LongDataEntry(valueEntry.getKey(), longValue));
- } catch (NumberFormatException e) {
- throw new JsonSyntaxException("Big integer values are not supported!");
- }
- }
- }
-
-
private static TelemetryUploadRequest convertToTelemetry(JsonElement jsonObject, long systemTs, int requestId) throws JsonSyntaxException {
BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId);
if (jsonObject.isJsonObject()) {
@@ -283,6 +270,19 @@ public class JsonConverter {
return result;
}
+ public static JsonElement toJson(AttributeUpdateNotificationMsg payload) {
+ JsonObject result = new JsonObject();
+ if (payload.getSharedUpdatedCount() > 0) {
+ payload.getSharedUpdatedList().forEach(addToObjectFromProto(result));
+ }
+ if (payload.getSharedDeletedCount() > 0) {
+ JsonArray attrObject = new JsonArray();
+ payload.getSharedDeletedList().forEach(attrObject::add);
+ result.add("deleted", attrObject);
+ }
+ return result;
+ }
+
public static JsonObject toJson(AttributesKVMsg payload, boolean asMap) {
JsonObject result = new JsonObject();
if (asMap) {
@@ -377,4 +377,5 @@ public class JsonConverter {
error.addProperty("error", errorMsg);
return error;
}
+
}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
index 44fcd2a..17d4a1f 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,6 +15,7 @@
*/
package org.thingsboard.server.common.transport;
+import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
/**
@@ -23,4 +24,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponse
public interface SessionMsgListener {
void onGetAttributesResponse(GetAttributeResponseMsg getAttributesResponse);
+
+ void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification);
}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index c3817a9..bf33b1d 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -15,6 +15,8 @@
*/
package org.thingsboard.server.common.transport;
+import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
@@ -43,6 +45,10 @@ public interface TransportService {
void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback);
+ void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback);
+
+ void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback);
+
void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
void deregisterSession(SessionInfoProto sessionInfo);
diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto
index d32bac9..66c0d81 100644
--- a/common/transport/src/main/proto/transport.proto
+++ b/common/transport/src/main/proto/transport.proto
@@ -108,6 +108,11 @@ message GetAttributeResponseMsg {
string error = 5;
}
+message AttributeUpdateNotificationMsg {
+ repeated TsKvProto sharedUpdated = 1;
+ repeated string sharedDeleted = 2;
+}
+
message ValidateDeviceTokenRequestMsg {
string token = 1;
}
@@ -124,12 +129,22 @@ message SessionCloseNotificationProto {
string message = 1;
}
+message SubscribeToAttributeUpdatesMsg {
+ bool unsubscribe = 1;
+}
+
+message SubscribeToRPCMsg {
+ bool unsubscribe = 1;
+}
+
message TransportToDeviceActorMsg {
SessionInfoProto sessionInfo = 1;
SessionEventMsg sessionEvent = 2;
PostTelemetryMsg postTelemetry = 3;
PostAttributeMsg postAttributes = 4;
GetAttributeRequestMsg getAttributes = 5;
+ SubscribeToAttributeUpdatesMsg subscribeToAttributes = 6;
+ SubscribeToRPCMsg subscribeToRPC= 7;
}
message DeviceActorToTransportMsg {
@@ -137,6 +152,7 @@ message DeviceActorToTransportMsg {
int64 sessionIdLSB = 2;
SessionCloseNotificationProto sessionCloseNotification = 3;
GetAttributeResponseMsg getAttributesResponse = 4;
+ AttributeUpdateNotificationMsg attributeUpdateNotification = 5;
}
/**
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
index ee41d2c..d4ab033 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -114,6 +114,13 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
}
@Override
+ public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException {
+ return Optional.of(createMqttPublishMsg(ctx,
+ MqttTopics.DEVICE_ATTRIBUTES_TOPIC,
+ JsonConverter.toJson(notificationMsg)));
+ }
+
+ @Override
public AdaptorToSessionActorMsg convertToActorMsg(DeviceSessionCtx ctx, SessionMsgType type, MqttMessage inbound) throws AdaptorException {
FromDeviceMsg msg;
switch (type) {
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
index 54e7c5c..602fde1 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
@@ -36,4 +36,8 @@ public interface MqttTransportAdaptor extends TransportAdaptor<DeviceSessionCtx,
TransportProtos.GetAttributeRequestMsg convertToGetAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException;
+
+ Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException;
+
+
}
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index c64ab5d..b1ee037 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -71,6 +71,7 @@ import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEP
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
import static io.netty.handler.codec.mqtt.MqttMessageType.CONNACK;
+import static io.netty.handler.codec.mqtt.MqttMessageType.PINGRESP;
import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK;
import static io.netty.handler.codec.mqtt.MqttMessageType.SUBACK;
import static io.netty.handler.codec.mqtt.MqttMessageType.UNSUBACK;
@@ -148,16 +149,17 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
case UNSUBSCRIBE:
processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
break;
-// case PINGREQ:
-// if (checkConnected(ctx)) {
-// ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
-// }
-// break;
-// case DISCONNECT:
-// if (checkConnected(ctx)) {
-// processDisconnect(ctx);
-// }
-// break;
+ case PINGREQ:
+ //TODO: should we push the notification to the rule engine?
+ if (checkConnected(ctx)) {
+ ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
+ }
+ break;
+ case DISCONNECT:
+ if (checkConnected(ctx)) {
+ processDisconnect(ctx);
+ }
+ break;
default:
break;
}
@@ -289,25 +291,20 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
MqttQoS reqQoS = subscription.qualityOfService();
try {
switch (topic) {
-// case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
-// AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
-// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
-// registerSubQoS(topic, grantedQoSList, reqQoS);
-// break;
-// }
-// case DEVICE_RPC_REQUESTS_SUB_TOPIC: {
-// AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
-// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
-// registerSubQoS(topic, grantedQoSList, reqQoS);
-// break;
-// }
-// case DEVICE_RPC_RESPONSE_SUB_TOPIC:
-// case GATEWAY_ATTRIBUTES_TOPIC:
-// case GATEWAY_RPC_TOPIC:
-// registerSubQoS(topic, grantedQoSList, reqQoS);
-// break;
+ case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
+ transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null);
+ registerSubQoS(topic, grantedQoSList, reqQoS);
+ break;
+ }
+ case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: {
+ transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null);
+ registerSubQoS(topic, grantedQoSList, reqQoS);
+ break;
+ }
+ case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC:
+ case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
+ case MqttTopics.GATEWAY_RPC_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
- deviceSessionCtx.setAllowAttributeResponses();
registerSubQoS(topic, grantedQoSList, reqQoS);
break;
default:
@@ -337,19 +334,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
mqttQoSMap.remove(topicName);
try {
switch (topicName) {
-// case DEVICE_ATTRIBUTES_TOPIC: {
-// AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
-// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
-// break;
-// }
-// case DEVICE_RPC_REQUESTS_SUB_TOPIC: {
-// AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
-// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
-// break;
-// }
- case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
- deviceSessionCtx.setDisallowAttributeResponses();
+ case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
+ transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(), null);
+ break;
+ }
+ case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: {
+ transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), null);
break;
+ }
}
} catch (Exception e) {
log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
@@ -551,7 +543,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
try {
adaptor.convertToPublish(deviceSessionCtx, response).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
} catch (Exception e) {
- log.trace("[{}] Failed to convert device attributes to MQTT msg", sessionId, e);
+ log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
+ }
+ }
+
+ @Override
+ public void onAttributeUpdate(TransportProtos.AttributeUpdateNotificationMsg notification) {
+ try {
+ adaptor.convertToPublish(deviceSessionCtx, notification).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
+ } catch (Exception e) {
+ log.trace("[{}] Failed to convert device attributes update to MQTT msg", sessionId, e);
}
}
}
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
index d704634..6bbb9fa 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
@@ -44,7 +44,6 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
private final MqttSessionId sessionId;
@Getter
private ChannelHandlerContext channel;
- private volatile boolean allowAttributeResponses;
private AtomicInteger msgIdSeq = new AtomicInteger(0);
public DeviceSessionCtx(ConcurrentMap<String, Integer> mqttQoSMap) {
@@ -103,14 +102,6 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
this.channel = channel;
}
- public void setAllowAttributeResponses() {
- allowAttributeResponses = true;
- }
-
- public void setDisallowAttributeResponses() {
- allowAttributeResponses = false;
- }
-
public int nextMsgId() {
return msgIdSeq.incrementAndGet();
}
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
index 7a2f698..648b01c 100644
--- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -161,6 +161,9 @@ public class MqttTransportService implements TransportService {
if (toSessionMsg.hasGetAttributesResponse()) {
listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse());
}
+ if (toSessionMsg.hasAttributeUpdateNotification()) {
+ listener.onAttributeUpdate(toSessionMsg.getAttributeUpdateNotification());
+ }
});
} else {
//TODO: should we notify the device actor about missed session?
@@ -252,6 +255,24 @@ public class MqttTransportService implements TransportService {
}
@Override
+ public void process(SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setSubscribeToAttributes(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
+ }
+
+ @Override
+ public void process(SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setSubscribeToRPC(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
+ }
+
+ @Override
public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {
sessions.putIfAbsent(toId(sessionInfo), listener);
//TODO: monitor sessions periodically: PING REQ/RESP, etc.