thingsboard-memoizeit

MQTT API implementation

10/11/2018 8:04:16 AM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 7abcca2..c590104 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -32,7 +32,6 @@ import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
 import org.thingsboard.server.common.data.DataConstants;
 import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.id.DeviceId;
-import org.thingsboard.server.common.data.id.SessionId;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.kv.AttributeKey;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
@@ -44,18 +43,33 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.core.AttributesUpdateNotification;
 import org.thingsboard.server.common.msg.core.RuleEngineError;
 import org.thingsboard.server.common.msg.core.RuleEngineErrorMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
-import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 import org.thingsboard.server.common.msg.session.SessionMsgType;
-import org.thingsboard.server.common.msg.session.SessionType;
 import org.thingsboard.server.common.msg.session.ToDeviceMsg;
 import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
 import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
 import org.thingsboard.server.gen.transport.TransportProtos;
+import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto;
+import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType;
+import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionCloseNotificationProto;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
+import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TsKvListProto;
+import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
 import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
@@ -76,8 +90,6 @@ import java.util.UUID;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
-import org.thingsboard.server.gen.transport.TransportProtos.*;
-
 /**
  * @author Andrew Shvayka
  */
@@ -123,11 +135,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
     void processRpcRequest(ActorContext context, ToDeviceRpcRequestActorMsg msg) {
         ToDeviceRpcRequest request = msg.getMsg();
         ToDeviceRpcRequestBody body = request.getBody();
-        ToDeviceRpcRequestMsg rpcRequest = new ToDeviceRpcRequestMsg(
-                rpcSeq++,
-                body.getMethod(),
-                body.getParams()
-        );
+        ToDeviceRpcRequestMsg rpcRequest = ToDeviceRpcRequestMsg.newBuilder().setRequestId(
+                rpcSeq++).setMethodName(body.getMethod()).setParams(body.getParams()).build();
 
         long timeout = request.getExpirationTime() - System.currentTimeMillis();
         if (timeout <= 0) {
@@ -136,13 +145,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         }
 
         boolean sent = rpcSubscriptions.size() > 0;
-        Set<SessionId> syncSessionSet = new HashSet<>();
+        Set<UUID> syncSessionSet = new HashSet<>();
         rpcSubscriptions.entrySet().forEach(sub -> {
-//            ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sub.getKey());
-//            sendMsgToSessionActor(response, sub.getValue().getServer());
-//            if (SessionType.SYNC == sub.getValue().getType()) {
-//                syncSessionSet.add(sub.getKey());
-//            }
+            sendToTransport(rpcRequest, sub.getKey(), sub.getValue().getNodeId());
+            if (TransportProtos.SessionType.SYNC == sub.getValue().getType()) {
+                syncSessionSet.add(sub.getKey());
+            }
         });
         syncSessionSet.forEach(rpcSubscriptions::remove);
 
@@ -175,10 +183,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         }
     }
 
-    private void sendPendingRequests(ActorContext context, SessionId sessionId, SessionType type, Optional<ServerAddress> server) {
+    private void sendPendingRequests(ActorContext context, UUID sessionId, SessionInfoProto sessionInfo) {
+        TransportProtos.SessionType sessionType = getSessionType(sessionId);
         if (!toDeviceRpcPendingMap.isEmpty()) {
             logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId);
-            if (type == SessionType.SYNC) {
+            if (sessionType == TransportProtos.SessionType.SYNC) {
                 logger.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId);
                 rpcSubscriptions.remove(sessionId);
             }
@@ -186,16 +195,16 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
             logger.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId);
         }
         Set<Integer> sentOneWayIds = new HashSet<>();
-        if (type == SessionType.ASYNC) {
-            toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, server, sentOneWayIds));
+        if (sessionType == TransportProtos.SessionType.ASYNC) {
+            toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, sessionInfo.getNodeId(), sentOneWayIds));
         } else {
-            toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, server, sentOneWayIds));
+            toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, sessionInfo.getNodeId(), sentOneWayIds));
         }
 
         sentOneWayIds.forEach(toDeviceRpcPendingMap::remove);
     }
 
-    private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(ActorContext context, SessionId sessionId, Optional<ServerAddress> server, Set<Integer> sentOneWayIds) {
+    private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(ActorContext context, UUID sessionId, String nodeId, Set<Integer> sentOneWayIds) {
         return entry -> {
             ToDeviceRpcRequestActorMsg requestActorMsg = entry.getValue().getMsg();
             ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg();
@@ -204,19 +213,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
                 sentOneWayIds.add(entry.getKey());
                 systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(request.getId(), requestActorMsg.getServerAddress(), null, null));
             }
-            ToDeviceRpcRequestMsg rpcRequest = new ToDeviceRpcRequestMsg(
-                    entry.getKey(),
-                    body.getMethod(),
-                    body.getParams()
-            );
-//            ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sessionId);
-//            sendMsgToSessionActor(response, server);
+            ToDeviceRpcRequestMsg rpcRequest = ToDeviceRpcRequestMsg.newBuilder().setRequestId(
+                    entry.getKey()).setMethodName(body.getMethod()).setParams(body.getParams()).build();
+            sendToTransport(rpcRequest, sessionId, nodeId);
         };
     }
 
     void process(ActorContext context, TransportToDeviceActorMsgWrapper wrapper) {
         TransportToDeviceActorMsg msg = wrapper.getMsg();
-//        processRpcResponses(context, msg);
         if (msg.hasSessionEvent()) {
             processSessionStateMsgs(msg.getSessionInfo(), msg.getSessionEvent());
         }
@@ -237,15 +241,13 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         if (msg.hasGetAttributes()) {
             handleGetAttributesRequest(context, msg.getSessionInfo(), msg.getGetAttributes());
         }
-//        SessionMsgType sessionMsgType = msg.getPayload().getMsgType();
-//        if (sessionMsgType.requiresRulesProcessing()) {
-//            switch (sessionMsgType) {
-//                case TO_SERVER_RPC_REQUEST:
-//                    handleClientSideRPCRequest(context, msg);
-//                    reportActivity();
-//                    break;
-//            }
-//        }
+        if (msg.hasToDeviceRPCCallResponse()) {
+            processRpcResponses(context, msg.getSessionInfo(), msg.getToDeviceRPCCallResponse());
+        }
+        if (msg.hasToServerRPCCallRequest()) {
+            handleClientSideRPCRequest(context, msg.getSessionInfo(), msg.getToServerRPCCallRequest());
+            reportActivity();
+        }
     }
 
     private void reportActivity() {
@@ -314,36 +316,42 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         }
     }
 
-//    private void handleClientSideRPCRequest(ActorContext context, DeviceToDeviceActorMsg src) {
-//        ToServerRpcRequestMsg request = (ToServerRpcRequestMsg) src.getPayload();
-//
-//        JsonObject json = new JsonObject();
-//        json.addProperty("method", request.getMethod());
-//        json.add("params", jsonParser.parse(request.getParams()));
-//
-//        TbMsgMetaData requestMetaData = defaultMetaData.copy();
-//        requestMetaData.putValue("requestId", Integer.toString(request.getRequestId()));
-//        TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
-//        PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), SessionMsgType.TO_SERVER_RPC_REQUEST, request.getRequestId(), false, 1);
-//        pushToRuleEngineWithTimeout(context, tbMsg, msgData);
-//
-//        scheduleMsgWithDelay(context, new DeviceActorClientSideRpcTimeoutMsg(request.getRequestId(), systemContext.getClientSideRpcTimeout()), systemContext.getClientSideRpcTimeout());
-//        toServerRpcPendingMap.put(request.getRequestId(), new ToServerRpcRequestMetadata(src.getSessionId(), src.getSessionType(), src.getServerAddress()));
-//    }
+    private void handleClientSideRPCRequest(ActorContext context, SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg request) {
+        UUID sessionId = getSessionId(sessionInfo);
+        JsonObject json = new JsonObject();
+        json.addProperty("method", request.getMethodName());
+        json.add("params", jsonParser.parse(request.getParams()));
+
+        TbMsgMetaData requestMetaData = defaultMetaData.copy();
+        requestMetaData.putValue("requestId", Integer.toString(request.getRequestId()));
+        TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
+        context.parent().tell(new DeviceActorToRuleEngineMsg(context.self(), tbMsg), context.self());
+
+        scheduleMsgWithDelay(context, new DeviceActorClientSideRpcTimeoutMsg(request.getRequestId(), systemContext.getClientSideRpcTimeout()), systemContext.getClientSideRpcTimeout());
+        toServerRpcPendingMap.put(request.getRequestId(), new ToServerRpcRequestMetadata(sessionId, getSessionType(sessionId), sessionInfo.getNodeId()));
+    }
 
-    public void processClientSideRpcTimeout(ActorContext context, DeviceActorClientSideRpcTimeoutMsg msg) {
+    private TransportProtos.SessionType getSessionType(UUID sessionId) {
+        return sessions.containsKey(sessionId) ? TransportProtos.SessionType.ASYNC : TransportProtos.SessionType.SYNC;
+    }
+
+    void processClientSideRpcTimeout(ActorContext context, DeviceActorClientSideRpcTimeoutMsg msg) {
         ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getId());
         if (data != null) {
             logger.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId());
-            ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(SessionMsgType.TO_SERVER_RPC_REQUEST, RuleEngineError.TIMEOUT);
-//            sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer());
+            sendToTransport(TransportProtos.ToServerRpcResponseMsg.newBuilder()
+                            .setRequestId(msg.getId()).setError("timeout").build()
+                    , data.getSessionId(), data.getNodeId());
         }
     }
 
     void processToServerRPCResponse(ActorContext context, ToServerRpcResponseActorMsg msg) {
-        ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getMsg().getRequestId());
+        int requestId = msg.getMsg().getRequestId();
+        ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(requestId);
         if (data != null) {
-//            sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer());
+            sendToTransport(TransportProtos.ToServerRpcResponseMsg.newBuilder()
+                            .setRequestId(requestId).setPayload(msg.getMsg().getData()).build()
+                    , data.getSessionId(), data.getNodeId());
         }
     }
 
@@ -382,7 +390,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
             if (hasNotificationData) {
                 AttributeUpdateNotificationMsg finalNotification = notification.build();
                 attributeSubscriptions.entrySet().forEach(sub -> {
-                    sendToTransport(finalNotification, sub.getKey(), sub.getValue());
+                    sendToTransport(finalNotification, sub.getKey(), sub.getValue().getNodeId());
                 });
             }
         } else {
@@ -390,6 +398,19 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         }
     }
 
+    private void processRpcResponses(ActorContext context, SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg responseMsg) {
+        UUID sessionId = getSessionId(sessionInfo);
+        logger.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId);
+        ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
+        boolean success = requestMd != null;
+        if (success) {
+            systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
+                    requestMd.getMsg().getServerAddress(), responseMsg.getPayload(), null));
+        } else {
+            logger.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
+        }
+    }
+
 //    private void processRpcResponses(ActorContext context, DeviceToDeviceActorMsg msg) {
 //        SessionId sessionId = msg.getSessionId();
 //        FromDeviceMsg inMsg = msg.getPayload();
@@ -424,7 +445,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
     }
 
     private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) {
-        UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
+        UUID sessionId = getSessionId(sessionInfo);
         if (subscribeCmd.getUnsubscribe()) {
             logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
             attributeSubscriptions.remove(sessionId);
@@ -438,8 +459,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         }
     }
 
+    private UUID getSessionId(SessionInfoProto sessionInfo) {
+        return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
+    }
+
     private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToRPCMsg subscribeCmd) {
-        UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
+        UUID sessionId = getSessionId(sessionInfo);
         if (subscribeCmd.getUnsubscribe()) {
             logger.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId);
             rpcSubscriptions.remove(sessionId);
@@ -450,11 +475,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
             }
             logger.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
             rpcSubscriptions.put(sessionId, session);
+            sendPendingRequests(context, sessionId, sessionInfo);
         }
     }
 
     private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {
-        UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
+        UUID sessionId = getSessionId(sessionInfo);
         if (msg.getEvent() == SessionEvent.OPEN) {
             logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
             if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) {
@@ -548,14 +574,31 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg);
     }
 
-    private void sendToTransport(AttributeUpdateNotificationMsg notificationMsg, UUID sessionId, SessionInfo sessionInfo) {
+    private void sendToTransport(AttributeUpdateNotificationMsg notificationMsg, UUID sessionId, String nodeId) {
         DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
                 .setSessionIdMSB(sessionId.getMostSignificantBits())
                 .setSessionIdLSB(sessionId.getLeastSignificantBits())
                 .setAttributeUpdateNotification(notificationMsg).build();
-        systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg);
+        systemContext.getRuleEngineTransportService().process(nodeId, msg);
     }
 
+    private void sendToTransport(ToDeviceRpcRequestMsg rpcMsg, UUID sessionId, String nodeId) {
+        DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
+                .setSessionIdMSB(sessionId.getMostSignificantBits())
+                .setSessionIdLSB(sessionId.getLeastSignificantBits())
+                .setToDeviceRequest(rpcMsg).build();
+        systemContext.getRuleEngineTransportService().process(nodeId, msg);
+    }
+
+    private void sendToTransport(TransportProtos.ToServerRpcResponseMsg rpcMsg, UUID sessionId, String nodeId) {
+        DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
+                .setSessionIdMSB(sessionId.getMostSignificantBits())
+                .setSessionIdLSB(sessionId.getLeastSignificantBits())
+                .setToServerResponse(rpcMsg).build();
+        systemContext.getRuleEngineTransportService().process(nodeId, msg);
+    }
+
+
     private List<TsKvProto> toTsKvProtos(@Nullable List<AttributeKvEntry> result) {
         List<TsKvProto> clientAttributes;
         if (result == null || result.isEmpty()) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java b/application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java
index f82a8c2..669d94b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java
@@ -16,18 +16,16 @@
 package org.thingsboard.server.actors.device;
 
 import lombok.Data;
-import org.thingsboard.server.common.data.id.SessionId;
-import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.session.SessionType;
+import org.thingsboard.server.gen.transport.TransportProtos;
 
-import java.util.Optional;
+import java.util.UUID;
 
 /**
  * @author Andrew Shvayka
  */
 @Data
 public class ToServerRpcRequestMetadata {
-    private final SessionId sessionId;
-    private final SessionType type;
-    private final Optional<ServerAddress> server;
+    private final UUID sessionId;
+    private final TransportProtos.SessionType type;
+    private final String nodeId;
 }
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
index 872dfaa..db7ee88 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,7 @@ import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import com.google.gson.JsonPrimitive;
 import com.google.gson.JsonSyntaxException;
+import org.springframework.util.StringUtils;
 import org.thingsboard.server.common.data.kv.AttributeKey;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
@@ -95,6 +96,15 @@ public class JsonConverter {
         }
     }
 
+    public static JsonElement toJson(TransportProtos.ToDeviceRpcRequestMsg msg, boolean includeRequestId) {
+        JsonObject result = new JsonObject();
+        if (includeRequestId) {
+            result.addProperty("id", msg.getRequestId());
+        }
+        result.addProperty("method", msg.getMethodName());
+        result.add("params", new JsonParser().parse(msg.getParams()));
+        return result;
+    }
 
     private static void parseObject(PostTelemetryMsg.Builder builder, long systemTs, JsonElement jsonObject) {
         JsonObject jo = jsonObject.getAsJsonObject();
@@ -112,14 +122,14 @@ public class JsonConverter {
         request.addTsKvList(builder.build());
     }
 
-    public static void parseWithTs(PostTelemetryMsg.Builder request, JsonObject jo) {
+    private static void parseWithTs(PostTelemetryMsg.Builder request, JsonObject jo) {
         TsKvListProto.Builder builder = TsKvListProto.newBuilder();
         builder.setTs(jo.get("ts").getAsLong());
         builder.addAllKv(parseProtoValues(jo.get("values").getAsJsonObject()));
         request.addTsKvList(builder.build());
     }
 
-    public static List<KeyValueProto> parseProtoValues(JsonObject valuesObject) {
+    private static List<KeyValueProto> parseProtoValues(JsonObject valuesObject) {
         List<KeyValueProto> result = new ArrayList<>();
         for (Entry<String, JsonElement> valueEntry : valuesObject.entrySet()) {
             JsonElement element = valueEntry.getValue();
@@ -172,9 +182,9 @@ public class JsonConverter {
         return request;
     }
 
-    public static ToServerRpcRequestMsg convertToServerRpcRequest(JsonElement json, int requestId) throws JsonSyntaxException {
+    public static TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(JsonElement json, int requestId) throws JsonSyntaxException {
         JsonObject object = json.getAsJsonObject();
-        return new ToServerRpcRequestMsg(requestId, object.get("method").getAsString(), GSON.toJson(object.get("params")));
+        return TransportProtos.ToServerRpcRequestMsg.newBuilder().setRequestId(requestId).setMethodName(object.get("method").getAsString()).setParams(GSON.toJson(object.get("params"))).build();
     }
 
     private static void parseObject(BasicTelemetryUploadRequest request, long systemTs, JsonElement jsonObject) {
@@ -368,8 +378,14 @@ public class JsonConverter {
         return result;
     }
 
-    public static JsonElement toJson(ToServerRpcResponseMsg msg) {
-        return new JsonParser().parse(msg.getData());
+    public static JsonElement toJson(TransportProtos.ToServerRpcResponseMsg msg) {
+        if (StringUtils.isEmpty(msg.getError())) {
+            return new JsonParser().parse(msg.getPayload());
+        } else {
+            JsonObject errorMsg = new JsonObject();
+            errorMsg.addProperty("error", msg.getError());
+            return errorMsg;
+        }
     }
 
     public static JsonElement toErrorJson(String errorMsg) {
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
index 17d4a1f..09d1bcb 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,8 +15,11 @@
  */
 package org.thingsboard.server.common.transport;
 
+import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg;
 import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionCloseNotificationProto;
+import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
 
 /**
  * Created by ashvayka on 04.10.18.
@@ -26,4 +29,10 @@ public interface SessionMsgListener {
     void onGetAttributesResponse(GetAttributeResponseMsg getAttributesResponse);
 
     void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification);
+
+    void onRemoteSessionCloseCommand(SessionCloseNotificationProto sessionCloseNotification);
+
+    void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest);
+
+    void onToServerRpcResponse(ToServerRpcResponseMsg toServerResponse);
 }
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index bf33b1d..cd30419 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -15,6 +15,8 @@
  */
 package org.thingsboard.server.common.transport;
 
+import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
 import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
@@ -49,6 +51,10 @@ public interface TransportService {
 
     void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback);
 
+    void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback);
+
+    void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
+
     void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
 
     void deregisterSession(SessionInfoProto sessionInfo);
diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto
index 66c0d81..bcf5f52 100644
--- a/common/transport/src/main/proto/transport.proto
+++ b/common/transport/src/main/proto/transport.proto
@@ -137,6 +137,29 @@ message SubscribeToRPCMsg {
   bool unsubscribe = 1;
 }
 
+message ToDeviceRpcRequestMsg {
+  int32 requestId = 1;
+  string methodName = 2;
+  string params = 3;
+}
+
+message ToDeviceRpcResponseMsg {
+  int32 requestId = 1;
+  string payload = 2;
+}
+
+message ToServerRpcRequestMsg {
+  int32 requestId = 1;
+  string methodName = 2;
+  string params = 3;
+}
+
+message ToServerRpcResponseMsg {
+  int32 requestId = 1;
+  string payload = 2;
+  string error = 3;
+}
+
 message TransportToDeviceActorMsg {
   SessionInfoProto sessionInfo = 1;
   SessionEventMsg sessionEvent = 2;
@@ -144,7 +167,9 @@ message TransportToDeviceActorMsg {
   PostAttributeMsg postAttributes = 4;
   GetAttributeRequestMsg getAttributes = 5;
   SubscribeToAttributeUpdatesMsg subscribeToAttributes = 6;
-  SubscribeToRPCMsg subscribeToRPC= 7;
+  SubscribeToRPCMsg subscribeToRPC = 7;
+  ToDeviceRpcResponseMsg toDeviceRPCCallResponse = 8;
+  ToServerRpcRequestMsg toServerRPCCallRequest = 9;
 }
 
 message DeviceActorToTransportMsg {
@@ -153,6 +178,8 @@ message DeviceActorToTransportMsg {
    SessionCloseNotificationProto sessionCloseNotification = 3;
    GetAttributeResponseMsg getAttributesResponse = 4;
    AttributeUpdateNotificationMsg attributeUpdateNotification = 5;
+   ToDeviceRpcRequestMsg toDeviceRequest = 6;
+   ToServerRpcResponseMsg toServerResponse = 7;
 }
 
 /**
diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java
index 1c96311..3cd4142 100644
--- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java
+++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java
@@ -98,7 +98,8 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
 
         String payload = validatePayload(ctx, inbound);
 
-        return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), 0);
+//        return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), 0);
+        return null;
     }
 
     @Override
@@ -225,8 +226,8 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
     private Response convertToServerRpcResponse(SessionContext ctx, ToServerRpcResponseMsg msg) {
         if (msg.isSuccess()) {
             Response response = new Response(ResponseCode.CONTENT);
-            JsonElement result = JsonConverter.toJson(msg);
-            response.setPayload(result.toString());
+//            JsonElement result = JsonConverter.toJson(msg);
+//            response.setPayload(result.toString());
             return response;
         } else {
             return convertError(Optional.of(new RuntimeException("Server RPC response is empty!")));
diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java
index e503409..5269285 100644
--- a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java
+++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java
@@ -114,7 +114,7 @@ public class HttpSessionCtx extends DeviceAwareSessionContext {
     }
 
     private void reply(ToServerRpcResponseMsg msg) {
-        responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK));
+//        responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK));
     }
 
     private void reply(AttributesUpdateNotification msg) {
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
index d4ab033..a2be169 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,7 +17,6 @@ package org.thingsboard.server.transport.mqtt.adaptors;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import com.google.gson.JsonSyntaxException;
 import io.netty.buffer.ByteBuf;
@@ -99,6 +98,31 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
     }
 
     @Override
+    public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
+        String topicName = inbound.variableHeader().topicName();
+        try {
+            Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC.length()));
+            String payload = inbound.payload().toString(UTF8);
+            return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(payload).build();
+        } catch (RuntimeException e) {
+            log.warn("Failed to decode get attributes request", e);
+            throw new AdaptorException(e);
+        }
+    }
+
+    @Override
+    public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
+        String topicName = inbound.variableHeader().topicName();
+        String payload = validatePayload(ctx.getSessionId(), inbound.payload());
+        try {
+            Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC.length()));
+            return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), requestId);
+        } catch (IllegalStateException | JsonSyntaxException ex) {
+            throw new AdaptorException(ex);
+        }
+    }
+
+    @Override
     public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
         if (!StringUtils.isEmpty(responseMsg.getError())) {
             throw new AdaptorException(responseMsg.getError());
@@ -115,9 +139,17 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
 
     @Override
     public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException {
-        return Optional.of(createMqttPublishMsg(ctx,
-                MqttTopics.DEVICE_ATTRIBUTES_TOPIC,
-                JsonConverter.toJson(notificationMsg)));
+        return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, JsonConverter.toJson(notificationMsg)));
+    }
+
+    @Override
+    public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException {
+        return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(), JsonConverter.toJson(rpcRequest, false)));
+    }
+
+    @Override
+    public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse) {
+        return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(), JsonConverter.toJson(rpcResponse)));
     }
 
     @Override
@@ -149,7 +181,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
                 msg = convertToRpcCommandResponse(ctx, (MqttPublishMessage) inbound);
                 break;
             case TO_SERVER_RPC_REQUEST:
-                msg = convertToServerRpcRequest(ctx, (MqttPublishMessage) inbound);
+                msg = null;//convertToServerRpcRequest(ctx, (MqttPublishMessage) inbound);
                 break;
             default:
                 log.warn("[{}] Unsupported msg type: {}!", ctx.getSessionId(), type);
@@ -181,13 +213,12 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
                 break;
             case TO_DEVICE_RPC_REQUEST:
                 ToDeviceRpcRequestMsg rpcRequest = (ToDeviceRpcRequestMsg) msg;
-                result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(),
-                        rpcRequest);
+                result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(), rpcRequest);
                 break;
             case TO_SERVER_RPC_RESPONSE:
-                ToServerRpcResponseMsg rpcResponse = (ToServerRpcResponseMsg) msg;
-                result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(),
-                        rpcResponse);
+//                ToServerRpcResponseMsg rpcResponse = (ToServerRpcResponseMsg) msg;
+//                result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(),
+//                        rpcResponse);
                 break;
             case RULE_ENGINE_ERROR:
                 RuleEngineErrorMsg errorMsg = (RuleEngineErrorMsg) msg;
@@ -232,7 +263,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
         return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg, false));
     }
 
-    private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, ToServerRpcResponseMsg msg) {
+    private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, TransportProtos.ToServerRpcResponseMsg msg) {
         return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg));
     }
 
@@ -290,7 +321,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
     private AttributesUpdateRequest convertToUpdateAttributesRequest(SessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
         String payload = validatePayload(ctx.getSessionId(), inbound.payload());
         try {
-            return JsonConverter.convertToAttributes(new JsonParser().parse(payload), inbound.variableHeader().messageId());
+            return JsonConverter.convertToAttributes(new JsonParser().parse(payload), inbound.variableHeader().packetId());
         } catch (IllegalStateException | JsonSyntaxException ex) {
             throw new AdaptorException(ex);
         }
@@ -299,18 +330,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
     private TelemetryUploadRequest convertToTelemetryUploadRequest(SessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
         String payload = validatePayload(ctx.getSessionId(), inbound.payload());
         try {
-            return JsonConverter.convertToTelemetry(new JsonParser().parse(payload), inbound.variableHeader().messageId());
-        } catch (IllegalStateException | JsonSyntaxException ex) {
-            throw new AdaptorException(ex);
-        }
-    }
-
-    private FromDeviceMsg convertToServerRpcRequest(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
-        String topicName = inbound.variableHeader().topicName();
-        String payload = validatePayload(ctx.getSessionId(), inbound.payload());
-        try {
-            Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC.length()));
-            return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), requestId);
+            return JsonConverter.convertToTelemetry(new JsonParser().parse(payload), inbound.variableHeader().packetId());
         } catch (IllegalStateException | JsonSyntaxException ex) {
             throw new AdaptorException(ex);
         }
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
index 602fde1..48173f9 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
@@ -19,7 +19,15 @@ import io.netty.handler.codec.mqtt.MqttMessage;
 import io.netty.handler.codec.mqtt.MqttPublishMessage;
 import org.thingsboard.server.common.transport.TransportAdaptor;
 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
-import org.thingsboard.server.gen.transport.TransportProtos;
+import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
 import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
 
 import java.util.Optional;
@@ -29,15 +37,21 @@ import java.util.Optional;
  */
 public interface MqttTransportAdaptor extends TransportAdaptor<DeviceSessionCtx, MqttMessage, MqttMessage> {
 
-    TransportProtos.PostTelemetryMsg convertToPostTelemetry(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
+    PostTelemetryMsg convertToPostTelemetry(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
 
-    TransportProtos.PostAttributeMsg convertToPostAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
+    PostAttributeMsg convertToPostAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
 
-    TransportProtos.GetAttributeRequestMsg convertToGetAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
+    GetAttributeRequestMsg convertToGetAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
 
-    Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException;
+    ToDeviceRpcResponseMsg convertToDeviceRpcResponse(DeviceSessionCtx ctx, MqttPublishMessage mqttMsg) throws AdaptorException;
 
-    Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException;
+    ToServerRpcRequestMsg convertToServerRpcRequest(DeviceSessionCtx ctx, MqttPublishMessage mqttMsg) throws AdaptorException;
 
+    Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, GetAttributeResponseMsg responseMsg) throws AdaptorException;
 
+    Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException;
+
+    Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException;
+
+    Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, ToServerRpcResponseMsg rpcResponse);
 }
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index b1ee037..3099b24 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -223,43 +223,18 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
             } else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
                 TransportProtos.GetAttributeRequestMsg getAttributeMsg = adaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg);
                 transportService.process(sessionInfo, getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
+            } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC)){
+                TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = adaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg);
+                transportService.process(sessionInfo, rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
+            } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)){
+                TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = adaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg);
+                transportService.process(sessionInfo, rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
             }
         } catch (AdaptorException e) {
             log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
             log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
             ctx.close();
         }
-//        AdaptorToSessionActorMsg msg = null;
-//        try {
-//            if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) {
-//                msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_TELEMETRY_REQUEST, mqttMsg);
-//            } else if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
-//                msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_ATTRIBUTES_REQUEST, mqttMsg);
-//            } else if (topicName.startsWith(DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
-//                msg = adaptor.convertToActorMsg(deviceSessionCtx, GET_ATTRIBUTES_REQUEST, mqttMsg);
-//                if (msgId >= 0) {
-//                    ctx.writeAndFlush(createMqttPubAckMsg(msgId));
-//                }
-//            } else if (topicName.startsWith(DEVICE_RPC_RESPONSE_TOPIC)) {
-//                msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_DEVICE_RPC_RESPONSE, mqttMsg);
-//                if (msgId >= 0) {
-//                    ctx.writeAndFlush(createMqttPubAckMsg(msgId));
-//                }
-//            } else if (topicName.startsWith(DEVICE_RPC_REQUESTS_TOPIC)) {
-//                msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_SERVER_RPC_REQUEST, mqttMsg);
-//                if (msgId >= 0) {
-//                    ctx.writeAndFlush(createMqttPubAckMsg(msgId));
-//                }
-//            }
-//        } catch (AdaptorException e) {
-//            log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
-//        }
-//        if (msg != null) {
-//            processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
-//        } else {
-//            log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
-//            ctx.close();
-//        }
     }
 
     private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final int msgId, final T msg) {
@@ -555,4 +530,30 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
             log.trace("[{}] Failed to convert device attributes update to MQTT msg", sessionId, e);
         }
     }
+
+    @Override
+    public void onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
+        log.trace("[{}] Received the remote command to close the session", sessionId);
+        processDisconnect(deviceSessionCtx.getChannel());
+    }
+
+    @Override
+    public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
+        log.trace("[{}] Received RPC command to device", sessionId);
+        try {
+            adaptor.convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
+        } catch (Exception e) {
+            log.trace("[{}] Failed to convert device RPC commandto MQTT msg", sessionId, e);
+        }
+    }
+
+    @Override
+    public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg rpcResponse) {
+        log.trace("[{}] Received RPC command to device", sessionId);
+        try {
+            adaptor.convertToPublish(deviceSessionCtx, rpcResponse).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
+        } catch (Exception e) {
+            log.trace("[{}] Failed to convert device RPC commandto MQTT msg", sessionId, e);
+        }
+    }
 }
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
index 648b01c..1539238 100644
--- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -164,6 +164,15 @@ public class MqttTransportService implements TransportService {
                                         if (toSessionMsg.hasAttributeUpdateNotification()) {
                                             listener.onAttributeUpdate(toSessionMsg.getAttributeUpdateNotification());
                                         }
+                                        if (toSessionMsg.hasSessionCloseNotification()) {
+                                            listener.onRemoteSessionCloseCommand(toSessionMsg.getSessionCloseNotification());
+                                        }
+                                        if (toSessionMsg.hasToDeviceRequest()) {
+                                            listener.onToDeviceRpcRequest(toSessionMsg.getToDeviceRequest());
+                                        }
+                                        if (toSessionMsg.hasToServerResponse()) {
+                                            listener.onToServerRpcResponse(toSessionMsg.getToServerResponse());
+                                        }
                                     });
                                 } else {
                                     //TODO: should we notify the device actor about missed session?
@@ -273,6 +282,24 @@ public class MqttTransportService implements TransportService {
     }
 
     @Override
+    public void process(SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
+        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                        .setToDeviceRPCCallResponse(msg).build()
+        ).build();
+        send(sessionInfo, toRuleEngineMsg, callback);
+    }
+
+    @Override
+    public void process(SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
+        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                        .setToServerRPCCallRequest(msg).build()
+        ).build();
+        send(sessionInfo, toRuleEngineMsg, callback);
+    }
+
+    @Override
     public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {
         sessions.putIfAbsent(toId(sessionInfo), listener);
         //TODO: monitor sessions periodically: PING REQ/RESP, etc.