thingsboard-developers

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 0822766..7abcca2 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -216,12 +216,16 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 
     void process(ActorContext context, TransportToDeviceActorMsgWrapper wrapper) {
         TransportToDeviceActorMsg msg = wrapper.getMsg();
-//        processSubscriptionCommands(context, msg);
 //        processRpcResponses(context, msg);
         if (msg.hasSessionEvent()) {
             processSessionStateMsgs(msg.getSessionInfo(), msg.getSessionEvent());
         }
-
+        if (msg.hasSubscribeToAttributes()) {
+            processSubscriptionCommands(context, msg.getSessionInfo(), msg.getSubscribeToAttributes());
+        }
+        if (msg.hasSubscribeToRPC()) {
+            processSubscriptionCommands(context, msg.getSessionInfo(), msg.getSubscribeToRPC());
+        }
         if (msg.hasPostAttributes()) {
             handlePostAttributesRequest(context, msg.getSessionInfo(), msg.getPostAttributes());
             reportActivity();
@@ -236,9 +240,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 //        SessionMsgType sessionMsgType = msg.getPayload().getMsgType();
 //        if (sessionMsgType.requiresRulesProcessing()) {
 //            switch (sessionMsgType) {
-//                case GET_ATTRIBUTES_REQUEST:
-//                    handleGetAttributesRequest(msg);
-//                    break;
 //                case TO_SERVER_RPC_REQUEST:
 //                    handleClientSideRPCRequest(context, msg);
 //                    reportActivity();
@@ -262,7 +263,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
     private void handleGetAttributesRequest(ActorContext context, SessionInfoProto sessionInfo, GetAttributeRequestMsg request) {
         ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, toOptionalSet(request.getClientAttributeNamesList()));
         ListenableFuture<List<AttributeKvEntry>> sharedAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.SHARED_SCOPE, toOptionalSet(request.getSharedAttributeNamesList()));
-        UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
         int requestId = request.getRequestId();
         Futures.addCallback(Futures.allAsList(Arrays.asList(clientAttributesFuture, sharedAttributesFuture)), new FutureCallback<List<List<AttributeKvEntry>>>() {
             @Override
@@ -272,7 +272,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
                         .addAllClientAttributeList(toTsKvProtos(result.get(0)))
                         .addAllSharedAttributeList(toTsKvProtos(result.get(1)))
                         .build();
-                sendToTransport(responseMsg, sessionId, sessionInfo);
+                sendToTransport(responseMsg, sessionInfo);
             }
 
             @Override
@@ -280,7 +280,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
                 GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder()
                         .setError(t.getMessage())
                         .build();
-                sendToTransport(responseMsg, sessionId, sessionInfo);
+                sendToTransport(responseMsg, sessionInfo);
             }
         });
     }
@@ -353,28 +353,37 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 
     void processAttributesUpdate(ActorContext context, DeviceAttributesEventNotificationMsg msg) {
         if (attributeSubscriptions.size() > 0) {
-            ToDeviceMsg notification = null;
+            boolean hasNotificationData = false;
+            AttributeUpdateNotificationMsg.Builder notification = AttributeUpdateNotificationMsg.newBuilder();
             if (msg.isDeleted()) {
-                List<AttributeKey> sharedKeys = msg.getDeletedKeys().stream()
+                List<String> sharedKeys = msg.getDeletedKeys().stream()
                         .filter(key -> DataConstants.SHARED_SCOPE.equals(key.getScope()))
+                        .map(AttributeKey::getAttributeKey)
                         .collect(Collectors.toList());
-                notification = new AttributesUpdateNotification(BasicAttributeKVMsg.fromDeleted(sharedKeys));
+                if (!sharedKeys.isEmpty()) {
+                    notification.addAllSharedDeleted(sharedKeys);
+                    hasNotificationData = true;
+                }
             } else {
                 if (DataConstants.SHARED_SCOPE.equals(msg.getScope())) {
                     List<AttributeKvEntry> attributes = new ArrayList<>(msg.getValues());
                     if (attributes.size() > 0) {
-                        notification = new AttributesUpdateNotification(BasicAttributeKVMsg.fromShared(attributes));
+                        List<TsKvProto> sharedUpdated = msg.getValues().stream().map(this::toTsKvProto)
+                                .collect(Collectors.toList());
+                        if (!sharedUpdated.isEmpty()) {
+                            notification.addAllSharedUpdated(sharedUpdated);
+                            hasNotificationData = true;
+                        }
                     } else {
                         logger.debug("[{}] No public server side attributes changed!", deviceId);
                     }
                 }
             }
-            if (notification != null) {
-                ToDeviceMsg finalNotification = notification;
-//                attributeSubscriptions.entrySet().forEach(sub -> {
-//                    ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(finalNotification, sub.getKey());
-//                    sendMsgToSessionActor(response, sub.getValue().getServer());
-//                });
+            if (hasNotificationData) {
+                AttributeUpdateNotificationMsg finalNotification = notification.build();
+                attributeSubscriptions.entrySet().forEach(sub -> {
+                    sendToTransport(finalNotification, sub.getKey(), sub.getValue());
+                });
             }
         } else {
             logger.debug("[{}] No registered attributes subscriptions to process!", deviceId);
@@ -414,25 +423,35 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 //        }
     }
 
-//    private void processSubscriptionCommands(ActorContext context, DeviceToDeviceActorMsg msg) {
-//        SessionId sessionId = msg.getSessionId();
-//        SessionType sessionType = msg.getSessionType();
-//        FromDeviceMsg inMsg = msg.getPayload();
-//        if (inMsg.getMsgType() == SessionMsgType.SUBSCRIBE_ATTRIBUTES_REQUEST) {
-//            logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
-//            attributeSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress()));
-//        } else if (inMsg.getMsgType() == SessionMsgType.UNSUBSCRIBE_ATTRIBUTES_REQUEST) {
-//            logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
-//            attributeSubscriptions.remove(sessionId);
-//        } else if (inMsg.getMsgType() == SessionMsgType.SUBSCRIBE_RPC_COMMANDS_REQUEST) {
-//            logger.debug("[{}] Registering rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType);
-//            rpcSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress()));
-//            sendPendingRequests(context, sessionId, sessionType, msg.getServerAddress());
-//        } else if (inMsg.getMsgType() == SessionMsgType.UNSUBSCRIBE_RPC_COMMANDS_REQUEST) {
-//            logger.debug("[{}] Canceling rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType);
-//            rpcSubscriptions.remove(sessionId);
-//        }
-//    }
+    private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) {
+        UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
+        if (subscribeCmd.getUnsubscribe()) {
+            logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
+            attributeSubscriptions.remove(sessionId);
+        } else {
+            SessionInfo session = sessions.get(sessionId);
+            if (session == null) {
+                session = new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId());
+            }
+            logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
+            attributeSubscriptions.put(sessionId, session);
+        }
+    }
+
+    private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToRPCMsg subscribeCmd) {
+        UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
+        if (subscribeCmd.getUnsubscribe()) {
+            logger.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId);
+            rpcSubscriptions.remove(sessionId);
+        } else {
+            SessionInfo session = sessions.get(sessionId);
+            if (session == null) {
+                session = new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId());
+            }
+            logger.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
+            rpcSubscriptions.put(sessionId, session);
+        }
+    }
 
     private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {
         UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
@@ -521,11 +540,19 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         }
     }
 
-    private void sendToTransport(GetAttributeResponseMsg responseMsg, UUID sessionId, SessionInfoProto sessionInfo) {
+    private void sendToTransport(GetAttributeResponseMsg responseMsg, SessionInfoProto sessionInfo) {
+        DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
+                .setSessionIdMSB(sessionInfo.getSessionIdMSB())
+                .setSessionIdLSB(sessionInfo.getSessionIdLSB())
+                .setGetAttributesResponse(responseMsg).build();
+        systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg);
+    }
+
+    private void sendToTransport(AttributeUpdateNotificationMsg notificationMsg, UUID sessionId, SessionInfo sessionInfo) {
         DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
                 .setSessionIdMSB(sessionId.getMostSignificantBits())
                 .setSessionIdLSB(sessionId.getLeastSignificantBits())
-                .setGetAttributesResponse(responseMsg).build();
+                .setAttributeUpdateNotification(notificationMsg).build();
         systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg);
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
index 32cb60d..43ae592 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
@@ -25,5 +25,4 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
 public class SessionInfo {
     private final SessionType type;
     private final String nodeId;
-
 }
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
index 6bb8eff..872dfaa 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -39,6 +39,7 @@ import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
 import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;
 import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
 import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
+import org.thingsboard.server.gen.transport.TransportProtos;
 import org.thingsboard.server.gen.transport.TransportProtos.*;
 
 import java.util.ArrayList;
@@ -153,20 +154,6 @@ public class JsonConverter {
         return result;
     }
 
-    private static void parseNumericProto(List<KvEntry> result, Entry<String, JsonElement> valueEntry, JsonPrimitive value) {
-        if (value.getAsString().contains(".")) {
-            result.add(new DoubleDataEntry(valueEntry.getKey(), value.getAsDouble()));
-        } else {
-            try {
-                long longValue = Long.parseLong(value.getAsString());
-                result.add(new LongDataEntry(valueEntry.getKey(), longValue));
-            } catch (NumberFormatException e) {
-                throw new JsonSyntaxException("Big integer values are not supported!");
-            }
-        }
-    }
-
-
     private static TelemetryUploadRequest convertToTelemetry(JsonElement jsonObject, long systemTs, int requestId) throws JsonSyntaxException {
         BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId);
         if (jsonObject.isJsonObject()) {
@@ -283,6 +270,19 @@ public class JsonConverter {
         return result;
     }
 
+    public static JsonElement toJson(AttributeUpdateNotificationMsg payload) {
+        JsonObject result = new JsonObject();
+        if (payload.getSharedUpdatedCount() > 0) {
+            payload.getSharedUpdatedList().forEach(addToObjectFromProto(result));
+        }
+        if (payload.getSharedDeletedCount() > 0) {
+            JsonArray attrObject = new JsonArray();
+            payload.getSharedDeletedList().forEach(attrObject::add);
+            result.add("deleted", attrObject);
+        }
+        return result;
+    }
+
     public static JsonObject toJson(AttributesKVMsg payload, boolean asMap) {
         JsonObject result = new JsonObject();
         if (asMap) {
@@ -377,4 +377,5 @@ public class JsonConverter {
         error.addProperty("error", errorMsg);
         return error;
     }
+
 }
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
index 44fcd2a..17d4a1f 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,6 +15,7 @@
  */
 package org.thingsboard.server.common.transport;
 
+import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
 
 /**
@@ -23,4 +24,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponse
 public interface SessionMsgListener {
 
     void onGetAttributesResponse(GetAttributeResponseMsg getAttributesResponse);
+
+    void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification);
 }
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index c3817a9..bf33b1d 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -15,6 +15,8 @@
  */
 package org.thingsboard.server.common.transport;
 
+import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
 import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
 import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
 import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
@@ -43,6 +45,10 @@ public interface TransportService {
 
     void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback);
 
+    void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback);
+
+    void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback);
+
     void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
 
     void deregisterSession(SessionInfoProto sessionInfo);
diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto
index d32bac9..66c0d81 100644
--- a/common/transport/src/main/proto/transport.proto
+++ b/common/transport/src/main/proto/transport.proto
@@ -108,6 +108,11 @@ message GetAttributeResponseMsg {
   string error = 5;
 }
 
+message AttributeUpdateNotificationMsg {
+  repeated TsKvProto sharedUpdated = 1;
+  repeated string sharedDeleted = 2;
+}
+
 message ValidateDeviceTokenRequestMsg {
   string token = 1;
 }
@@ -124,12 +129,22 @@ message SessionCloseNotificationProto {
   string message = 1;
 }
 
+message SubscribeToAttributeUpdatesMsg {
+  bool unsubscribe = 1;
+}
+
+message SubscribeToRPCMsg {
+  bool unsubscribe = 1;
+}
+
 message TransportToDeviceActorMsg {
   SessionInfoProto sessionInfo = 1;
   SessionEventMsg sessionEvent = 2;
   PostTelemetryMsg postTelemetry = 3;
   PostAttributeMsg postAttributes = 4;
   GetAttributeRequestMsg getAttributes = 5;
+  SubscribeToAttributeUpdatesMsg subscribeToAttributes = 6;
+  SubscribeToRPCMsg subscribeToRPC= 7;
 }
 
 message DeviceActorToTransportMsg {
@@ -137,6 +152,7 @@ message DeviceActorToTransportMsg {
    int64 sessionIdLSB = 2;
    SessionCloseNotificationProto sessionCloseNotification = 3;
    GetAttributeResponseMsg getAttributesResponse = 4;
+   AttributeUpdateNotificationMsg attributeUpdateNotification = 5;
 }
 
 /**
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
index ee41d2c..d4ab033 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -114,6 +114,13 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
     }
 
     @Override
+    public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException {
+        return Optional.of(createMqttPublishMsg(ctx,
+                MqttTopics.DEVICE_ATTRIBUTES_TOPIC,
+                JsonConverter.toJson(notificationMsg)));
+    }
+
+    @Override
     public AdaptorToSessionActorMsg convertToActorMsg(DeviceSessionCtx ctx, SessionMsgType type, MqttMessage inbound) throws AdaptorException {
         FromDeviceMsg msg;
         switch (type) {
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
index 54e7c5c..602fde1 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
@@ -36,4 +36,8 @@ public interface MqttTransportAdaptor extends TransportAdaptor<DeviceSessionCtx,
     TransportProtos.GetAttributeRequestMsg convertToGetAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
 
     Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException;
+
+    Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException;
+
+
 }
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index c64ab5d..b1ee037 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -71,6 +71,7 @@ import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEP
 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
 import static io.netty.handler.codec.mqtt.MqttMessageType.CONNACK;
+import static io.netty.handler.codec.mqtt.MqttMessageType.PINGRESP;
 import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK;
 import static io.netty.handler.codec.mqtt.MqttMessageType.SUBACK;
 import static io.netty.handler.codec.mqtt.MqttMessageType.UNSUBACK;
@@ -148,16 +149,17 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
             case UNSUBSCRIBE:
                 processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
                 break;
-//            case PINGREQ:
-//                if (checkConnected(ctx)) {
-//                    ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
-//                }
-//                break;
-//            case DISCONNECT:
-//                if (checkConnected(ctx)) {
-//                    processDisconnect(ctx);
-//                }
-//                break;
+            case PINGREQ:
+                //TODO: should we push the notification to the rule engine?
+                if (checkConnected(ctx)) {
+                    ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
+                }
+                break;
+            case DISCONNECT:
+                if (checkConnected(ctx)) {
+                    processDisconnect(ctx);
+                }
+                break;
             default:
                 break;
         }
@@ -289,25 +291,20 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
             MqttQoS reqQoS = subscription.qualityOfService();
             try {
                 switch (topic) {
-//                    case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
-//                        AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
-//                        processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
-//                        registerSubQoS(topic, grantedQoSList, reqQoS);
-//                        break;
-//                    }
-//                    case DEVICE_RPC_REQUESTS_SUB_TOPIC: {
-//                        AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
-//                        processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
-//                        registerSubQoS(topic, grantedQoSList, reqQoS);
-//                        break;
-//                    }
-//                    case DEVICE_RPC_RESPONSE_SUB_TOPIC:
-//                    case GATEWAY_ATTRIBUTES_TOPIC:
-//                    case GATEWAY_RPC_TOPIC:
-//                        registerSubQoS(topic, grantedQoSList, reqQoS);
-//                        break;
+                    case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
+                        transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null);
+                        registerSubQoS(topic, grantedQoSList, reqQoS);
+                        break;
+                    }
+                    case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: {
+                        transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null);
+                        registerSubQoS(topic, grantedQoSList, reqQoS);
+                        break;
+                    }
+                    case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC:
+                    case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
+                    case MqttTopics.GATEWAY_RPC_TOPIC:
                     case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
-                        deviceSessionCtx.setAllowAttributeResponses();
                         registerSubQoS(topic, grantedQoSList, reqQoS);
                         break;
                     default:
@@ -337,19 +334,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
             mqttQoSMap.remove(topicName);
             try {
                 switch (topicName) {
-//                    case DEVICE_ATTRIBUTES_TOPIC: {
-//                        AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
-//                        processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
-//                        break;
-//                    }
-//                    case DEVICE_RPC_REQUESTS_SUB_TOPIC: {
-//                        AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
-//                        processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
-//                        break;
-//                    }
-                    case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
-                        deviceSessionCtx.setDisallowAttributeResponses();
+                    case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
+                        transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(), null);
+                        break;
+                    }
+                    case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: {
+                        transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), null);
                         break;
+                    }
                 }
             } catch (Exception e) {
                 log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
@@ -551,7 +543,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
         try {
             adaptor.convertToPublish(deviceSessionCtx, response).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
         } catch (Exception e) {
-            log.trace("[{}] Failed to convert device attributes to MQTT msg", sessionId, e);
+            log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
+        }
+    }
+
+    @Override
+    public void onAttributeUpdate(TransportProtos.AttributeUpdateNotificationMsg notification) {
+        try {
+            adaptor.convertToPublish(deviceSessionCtx, notification).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
+        } catch (Exception e) {
+            log.trace("[{}] Failed to convert device attributes update to MQTT msg", sessionId, e);
         }
     }
 }
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
index d704634..6bbb9fa 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
@@ -44,7 +44,6 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
     private final MqttSessionId sessionId;
     @Getter
     private ChannelHandlerContext channel;
-    private volatile boolean allowAttributeResponses;
     private AtomicInteger msgIdSeq = new AtomicInteger(0);
 
     public DeviceSessionCtx(ConcurrentMap<String, Integer> mqttQoSMap) {
@@ -103,14 +102,6 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
         this.channel = channel;
     }
 
-    public void setAllowAttributeResponses() {
-        allowAttributeResponses = true;
-    }
-
-    public void setDisallowAttributeResponses() {
-        allowAttributeResponses = false;
-    }
-
     public int nextMsgId() {
         return msgIdSeq.incrementAndGet();
     }
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
index 7a2f698..648b01c 100644
--- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -161,6 +161,9 @@ public class MqttTransportService implements TransportService {
                                         if (toSessionMsg.hasGetAttributesResponse()) {
                                             listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse());
                                         }
+                                        if (toSessionMsg.hasAttributeUpdateNotification()) {
+                                            listener.onAttributeUpdate(toSessionMsg.getAttributeUpdateNotification());
+                                        }
                                     });
                                 } else {
                                     //TODO: should we notify the device actor about missed session?
@@ -252,6 +255,24 @@ public class MqttTransportService implements TransportService {
     }
 
     @Override
+    public void process(SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
+        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                        .setSubscribeToAttributes(msg).build()
+        ).build();
+        send(sessionInfo, toRuleEngineMsg, callback);
+    }
+
+    @Override
+    public void process(SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
+        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                        .setSubscribeToRPC(msg).build()
+        ).build();
+        send(sessionInfo, toRuleEngineMsg, callback);
+    }
+
+    @Override
     public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {
         sessions.putIfAbsent(toId(sessionInfo), listener);
         //TODO: monitor sessions periodically: PING REQ/RESP, etc.