thingsboard-developers

Changes

application/src/main/java/org/thingsboard/server/actors/device/RuleEngineQueuePutAckMsg.java 36(+0 -36)

common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicActorSystemToDeviceSessionActorMsg.java 52(+0 -52)

common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorQueueTimeoutMsg.java 36(+0 -36)

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 8ff61ac..1081ab2 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -200,10 +200,6 @@ public class ActorSystemContext {
 
     @Autowired
     @Getter
-    private MsgQueueService msgQueueService;
-
-    @Autowired
-    @Getter
     private DeviceStateService deviceStateService;
 
     @Lazy
@@ -269,10 +265,6 @@ public class ActorSystemContext {
 
     @Getter
     @Setter
-    private ActorRef sessionManagerActor;
-
-    @Getter
-    @Setter
     private ActorRef statsActor;
 
     @Getter
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index 14ca586..f4373db 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -38,7 +38,6 @@ import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.core.BasicActorSystemToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
 import org.thingsboard.server.dao.model.ModelConstants;
@@ -113,19 +112,12 @@ public class AppActor extends RuleChainManagerActor {
             case REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG:
                 onToDeviceActorMsg((TenantAwareMsg) msg);
                 break;
-            case ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG:
-                onToDeviceSessionMsg((BasicActorSystemToDeviceSessionActorMsg) msg);
-                break;
             default:
                 return false;
         }
         return true;
     }
 
-    private void onToDeviceSessionMsg(BasicActorSystemToDeviceSessionActorMsg msg) {
-        systemContext.getSessionManagerActor().tell(msg, self());
-    }
-
     private void onPossibleClusterMsg(SendToClusterMsg msg) {
         Optional<ServerAddress> address = systemContext.getRoutingService().resolveById(msg.getEntityId());
         if (address.isPresent()) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
index 7fabe49..bd2a0f4 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
@@ -27,7 +27,6 @@ import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
 import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
-import org.thingsboard.server.common.msg.timeout.DeviceActorQueueTimeoutMsg;
 import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
 import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
@@ -74,12 +73,6 @@ public class DeviceActor extends ContextAwareActor {
             case DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG:
                 processor.processClientSideRpcTimeout(context(), (DeviceActorClientSideRpcTimeoutMsg) msg);
                 break;
-            case DEVICE_ACTOR_QUEUE_TIMEOUT_MSG:
-                processor.processQueueTimeout(context(), (DeviceActorQueueTimeoutMsg) msg);
-                break;
-            case RULE_ENGINE_QUEUE_PUT_ACK_MSG:
-                processor.processQueueAck(context(), (RuleEngineQueuePutAckMsg) msg);
-                break;
             default:
                 return false;
         }
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 1313f60..0822766 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,7 +16,6 @@
 package org.thingsboard.server.actors.device;
 
 import akka.actor.ActorContext;
-import akka.actor.ActorRef;
 import akka.event.LoggingAdapter;
 import com.datastax.driver.core.utils.UUIDs;
 import com.google.common.util.concurrent.FutureCallback;
@@ -46,29 +45,15 @@ import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.core.AttributesUpdateNotification;
-import org.thingsboard.server.common.msg.core.AttributesUpdateRequest;
-import org.thingsboard.server.common.msg.core.BasicActorSystemToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.core.BasicCommandAckResponse;
-import org.thingsboard.server.common.msg.core.BasicGetAttributesResponse;
-import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
-import org.thingsboard.server.common.msg.core.GetAttributesRequest;
 import org.thingsboard.server.common.msg.core.RuleEngineError;
 import org.thingsboard.server.common.msg.core.RuleEngineErrorMsg;
-import org.thingsboard.server.common.msg.core.SessionCloseMsg;
-import org.thingsboard.server.common.msg.core.SessionCloseNotification;
-import org.thingsboard.server.common.msg.core.SessionOpenMsg;
-import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
 import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg;
-import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;
 import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
-import org.thingsboard.server.common.msg.session.FromDeviceMsg;
 import org.thingsboard.server.common.msg.session.SessionMsgType;
 import org.thingsboard.server.common.msg.session.SessionType;
 import org.thingsboard.server.common.msg.session.ToDeviceMsg;
 import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
-import org.thingsboard.server.common.msg.timeout.DeviceActorQueueTimeoutMsg;
 import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
 import org.thingsboard.server.gen.transport.TransportProtos;
 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
@@ -88,9 +73,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import org.thingsboard.server.gen.transport.TransportProtos.*;
@@ -192,19 +175,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         }
     }
 
-    void processQueueAck(ActorContext context, RuleEngineQueuePutAckMsg msg) {
-        PendingSessionMsgData data = pendingMsgs.remove(msg.getId());
-        if (data != null && data.isReplyOnQueueAck()) {
-            int remainingAcks = data.getAckMsgCount() - 1;
-            data.setAckMsgCount(remainingAcks);
-            logger.debug("[{}] Queue put [{}] ack detected. Remaining acks: {}!", deviceId, msg.getId(), remainingAcks);
-            if (remainingAcks == 0) {
-                ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onSuccess(data.getSessionMsgType(), data.getRequestId());
-                sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
-            }
-        }
-    }
-
     private void sendPendingRequests(ActorContext context, SessionId sessionId, SessionType type, Optional<ServerAddress> server) {
         if (!toDeviceRpcPendingMap.isEmpty()) {
             logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId);
@@ -239,8 +209,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
                     body.getMethod(),
                     body.getParams()
             );
-            ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sessionId);
-            sendMsgToSessionActor(response, server);
+//            ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sessionId);
+//            sendMsgToSessionActor(response, server);
         };
     }
 
@@ -292,57 +262,25 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
     private void handleGetAttributesRequest(ActorContext context, SessionInfoProto sessionInfo, GetAttributeRequestMsg request) {
         ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, toOptionalSet(request.getClientAttributeNamesList()));
         ListenableFuture<List<AttributeKvEntry>> sharedAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.SHARED_SCOPE, toOptionalSet(request.getSharedAttributeNamesList()));
-
-        Futures.addCallback(Futures.allAsList(Arrays.asList(clientAttributesFuture, sharedAttributesFuture)), new FutureCallback<List<List<AttributeKvEntry>>>() {
-            @Override
-            public void onSuccess(@Nullable List<List<AttributeKvEntry>> result) {
-                systemContext.getRuleEngineTransportService().process();
-                BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(),
-                        request.getRequestId(), BasicAttributeKVMsg.from(result.get(0), result.get(1)));
-                sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, src.getSessionId()), src.getServerAddress());
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                if (t instanceof Exception) {
-                    ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onError(SessionMsgType.GET_ATTRIBUTES_REQUEST, request.getRequestId(), (Exception) t);
-                    sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, src.getSessionId()), src.getServerAddress());
-                } else {
-                    logger.error("[{}] Failed to process attributes request", deviceId, t);
-                }
-            }
-        });
-    }
-
-    private Optional<Set<String>> toOptionalSet(List<String> strings) {
-        if (strings == null || strings.isEmpty()) {
-            return Optional.empty();
-        } else {
-            return Optional.of(new HashSet<>(strings));
-        }
-    }
-
-    private void handleGetAttributesRequest(DeviceToDeviceActorMsg src) {
-        GetAttributesRequest request = (GetAttributesRequest) src.getPayload();
-        ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, request.getClientAttributeNames());
-        ListenableFuture<List<AttributeKvEntry>> sharedAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.SHARED_SCOPE, request.getSharedAttributeNames());
-
+        UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
+        int requestId = request.getRequestId();
         Futures.addCallback(Futures.allAsList(Arrays.asList(clientAttributesFuture, sharedAttributesFuture)), new FutureCallback<List<List<AttributeKvEntry>>>() {
             @Override
             public void onSuccess(@Nullable List<List<AttributeKvEntry>> result) {
-                BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(),
-                        request.getRequestId(), BasicAttributeKVMsg.from(result.get(0), result.get(1)));
-                sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, src.getSessionId()), src.getServerAddress());
+                GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder()
+                        .setRequestId(requestId)
+                        .addAllClientAttributeList(toTsKvProtos(result.get(0)))
+                        .addAllSharedAttributeList(toTsKvProtos(result.get(1)))
+                        .build();
+                sendToTransport(responseMsg, sessionId, sessionInfo);
             }
 
             @Override
             public void onFailure(Throwable t) {
-                if (t instanceof Exception) {
-                    ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onError(SessionMsgType.GET_ATTRIBUTES_REQUEST, request.getRequestId(), (Exception) t);
-                    sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, src.getSessionId()), src.getServerAddress());
-                } else {
-                    logger.error("[{}] Failed to process attributes request", deviceId, t);
-                }
+                GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder()
+                        .setError(t.getMessage())
+                        .build();
+                sendToTransport(responseMsg, sessionId, sessionInfo);
             }
         });
     }
@@ -376,36 +314,36 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         }
     }
 
-    private void handleClientSideRPCRequest(ActorContext context, DeviceToDeviceActorMsg src) {
-        ToServerRpcRequestMsg request = (ToServerRpcRequestMsg) src.getPayload();
-
-        JsonObject json = new JsonObject();
-        json.addProperty("method", request.getMethod());
-        json.add("params", jsonParser.parse(request.getParams()));
-
-        TbMsgMetaData requestMetaData = defaultMetaData.copy();
-        requestMetaData.putValue("requestId", Integer.toString(request.getRequestId()));
-        TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
-        PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), SessionMsgType.TO_SERVER_RPC_REQUEST, request.getRequestId(), false, 1);
-        pushToRuleEngineWithTimeout(context, tbMsg, msgData);
-
-        scheduleMsgWithDelay(context, new DeviceActorClientSideRpcTimeoutMsg(request.getRequestId(), systemContext.getClientSideRpcTimeout()), systemContext.getClientSideRpcTimeout());
-        toServerRpcPendingMap.put(request.getRequestId(), new ToServerRpcRequestMetadata(src.getSessionId(), src.getSessionType(), src.getServerAddress()));
-    }
+//    private void handleClientSideRPCRequest(ActorContext context, DeviceToDeviceActorMsg src) {
+//        ToServerRpcRequestMsg request = (ToServerRpcRequestMsg) src.getPayload();
+//
+//        JsonObject json = new JsonObject();
+//        json.addProperty("method", request.getMethod());
+//        json.add("params", jsonParser.parse(request.getParams()));
+//
+//        TbMsgMetaData requestMetaData = defaultMetaData.copy();
+//        requestMetaData.putValue("requestId", Integer.toString(request.getRequestId()));
+//        TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
+//        PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), SessionMsgType.TO_SERVER_RPC_REQUEST, request.getRequestId(), false, 1);
+//        pushToRuleEngineWithTimeout(context, tbMsg, msgData);
+//
+//        scheduleMsgWithDelay(context, new DeviceActorClientSideRpcTimeoutMsg(request.getRequestId(), systemContext.getClientSideRpcTimeout()), systemContext.getClientSideRpcTimeout());
+//        toServerRpcPendingMap.put(request.getRequestId(), new ToServerRpcRequestMetadata(src.getSessionId(), src.getSessionType(), src.getServerAddress()));
+//    }
 
     public void processClientSideRpcTimeout(ActorContext context, DeviceActorClientSideRpcTimeoutMsg msg) {
         ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getId());
         if (data != null) {
             logger.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId());
             ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(SessionMsgType.TO_SERVER_RPC_REQUEST, RuleEngineError.TIMEOUT);
-            sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer());
+//            sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer());
         }
     }
 
     void processToServerRPCResponse(ActorContext context, ToServerRpcResponseActorMsg msg) {
         ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getMsg().getRequestId());
         if (data != null) {
-            sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer());
+//            sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer());
         }
     }
 
@@ -433,68 +371,68 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
             }
             if (notification != null) {
                 ToDeviceMsg finalNotification = notification;
-                attributeSubscriptions.entrySet().forEach(sub -> {
-                    ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(finalNotification, sub.getKey());
-                    sendMsgToSessionActor(response, sub.getValue().getServer());
-                });
+//                attributeSubscriptions.entrySet().forEach(sub -> {
+//                    ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(finalNotification, sub.getKey());
+//                    sendMsgToSessionActor(response, sub.getValue().getServer());
+//                });
             }
         } else {
             logger.debug("[{}] No registered attributes subscriptions to process!", deviceId);
         }
     }
 
-    private void processRpcResponses(ActorContext context, DeviceToDeviceActorMsg msg) {
-        SessionId sessionId = msg.getSessionId();
-        FromDeviceMsg inMsg = msg.getPayload();
-        if (inMsg.getMsgType() == SessionMsgType.TO_DEVICE_RPC_RESPONSE) {
-            logger.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId);
-            ToDeviceRpcResponseMsg responseMsg = (ToDeviceRpcResponseMsg) inMsg;
-            ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
-            boolean success = requestMd != null;
-            if (success) {
-                systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
-                        requestMd.getMsg().getServerAddress(), responseMsg.getData(), null));
-            } else {
-                logger.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
-            }
-            if (msg.getSessionType() == SessionType.SYNC) {
-                BasicCommandAckResponse response = success
-                        ? BasicCommandAckResponse.onSuccess(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId())
-                        : BasicCommandAckResponse.onError(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId(), new TimeoutException());
-                sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, msg.getSessionId()), msg.getServerAddress());
-            }
-        }
-    }
+//    private void processRpcResponses(ActorContext context, DeviceToDeviceActorMsg msg) {
+//        SessionId sessionId = msg.getSessionId();
+//        FromDeviceMsg inMsg = msg.getPayload();
+//        if (inMsg.getMsgType() == SessionMsgType.TO_DEVICE_RPC_RESPONSE) {
+//            logger.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId);
+//            ToDeviceRpcResponseMsg responseMsg = (ToDeviceRpcResponseMsg) inMsg;
+//            ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
+//            boolean success = requestMd != null;
+//            if (success) {
+//                systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
+//                        requestMd.getMsg().getServerAddress(), responseMsg.getData(), null));
+//            } else {
+//                logger.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
+//            }
+//            if (msg.getSessionType() == SessionType.SYNC) {
+//                BasicCommandAckResponse response = success
+//                        ? BasicCommandAckResponse.onSuccess(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId())
+//                        : BasicCommandAckResponse.onError(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId(), new TimeoutException());
+//                sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, msg.getSessionId()), msg.getServerAddress());
+//            }
+//        }
+//    }
 
     void processClusterEventMsg(ClusterEventMsg msg) {
-        if (!msg.isAdded()) {
-            logger.debug("[{}] Clearing attributes/rpc subscription for server [{}]", deviceId, msg.getServerAddress());
-            Predicate<Map.Entry<SessionId, SessionInfo>> filter = e -> e.getValue().getServer()
-                    .map(serverAddress -> serverAddress.equals(msg.getServerAddress())).orElse(false);
-            attributeSubscriptions.entrySet().removeIf(filter);
-            rpcSubscriptions.entrySet().removeIf(filter);
-        }
+//        if (!msg.isAdded()) {
+//            logger.debug("[{}] Clearing attributes/rpc subscription for server [{}]", deviceId, msg.getServerAddress());
+//            Predicate<Map.Entry<SessionId, SessionInfo>> filter = e -> e.getValue().getServer()
+//                    .map(serverAddress -> serverAddress.equals(msg.getServerAddress())).orElse(false);
+//            attributeSubscriptions.entrySet().removeIf(filter);
+//            rpcSubscriptions.entrySet().removeIf(filter);
+//        }
     }
 
-    private void processSubscriptionCommands(ActorContext context, DeviceToDeviceActorMsg msg) {
-        SessionId sessionId = msg.getSessionId();
-        SessionType sessionType = msg.getSessionType();
-        FromDeviceMsg inMsg = msg.getPayload();
-        if (inMsg.getMsgType() == SessionMsgType.SUBSCRIBE_ATTRIBUTES_REQUEST) {
-            logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
-            attributeSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress()));
-        } else if (inMsg.getMsgType() == SessionMsgType.UNSUBSCRIBE_ATTRIBUTES_REQUEST) {
-            logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
-            attributeSubscriptions.remove(sessionId);
-        } else if (inMsg.getMsgType() == SessionMsgType.SUBSCRIBE_RPC_COMMANDS_REQUEST) {
-            logger.debug("[{}] Registering rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType);
-            rpcSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress()));
-            sendPendingRequests(context, sessionId, sessionType, msg.getServerAddress());
-        } else if (inMsg.getMsgType() == SessionMsgType.UNSUBSCRIBE_RPC_COMMANDS_REQUEST) {
-            logger.debug("[{}] Canceling rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType);
-            rpcSubscriptions.remove(sessionId);
-        }
-    }
+//    private void processSubscriptionCommands(ActorContext context, DeviceToDeviceActorMsg msg) {
+//        SessionId sessionId = msg.getSessionId();
+//        SessionType sessionType = msg.getSessionType();
+//        FromDeviceMsg inMsg = msg.getPayload();
+//        if (inMsg.getMsgType() == SessionMsgType.SUBSCRIBE_ATTRIBUTES_REQUEST) {
+//            logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
+//            attributeSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress()));
+//        } else if (inMsg.getMsgType() == SessionMsgType.UNSUBSCRIBE_ATTRIBUTES_REQUEST) {
+//            logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
+//            attributeSubscriptions.remove(sessionId);
+//        } else if (inMsg.getMsgType() == SessionMsgType.SUBSCRIBE_RPC_COMMANDS_REQUEST) {
+//            logger.debug("[{}] Registering rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType);
+//            rpcSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress()));
+//            sendPendingRequests(context, sessionId, sessionType, msg.getServerAddress());
+//        } else if (inMsg.getMsgType() == SessionMsgType.UNSUBSCRIBE_RPC_COMMANDS_REQUEST) {
+//            logger.debug("[{}] Canceling rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType);
+//            rpcSubscriptions.remove(sessionId);
+//        }
+//    }
 
     private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {
         UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
@@ -506,15 +444,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
                     closeSession(sessionIdToRemove, sessions.remove(sessionIdToRemove));
                 }
             }
-            sessions.put(sessionId, new SessionInfo(SessionType.ASYNC, msg.getServerAddress()));
+            sessions.put(sessionId, new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfo.getNodeId()));
             if (sessions.size() == 1) {
                 reportSessionOpen();
             }
-        }
-        FromDeviceMsg inMsg = msg.getPayload();
-        if (inMsg instanceof SessionOpenMsg) {
-            logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
-        } else if (inMsg instanceof SessionCloseMsg) {
+        } else if (msg.getEvent() == SessionEvent.CLOSED) {
             logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
             sessions.remove(sessionId);
             attributeSubscriptions.remove(sessionId);
@@ -532,7 +466,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
             systemContext.getRpcService().tell(systemContext.getEncodingService()
                     .convertToProtoDataMessage(sessionAddress.get(), response));
         } else {
-            systemContext.getSessionManagerActor().tell(response, ActorRef.noSender());
+//            systemContext.getSessionManagerActor().tell(response, ActorRef.noSender());
         }
     }
 
@@ -578,4 +512,62 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         }
         return json;
     }
+
+    private Optional<Set<String>> toOptionalSet(List<String> strings) {
+        if (strings == null || strings.isEmpty()) {
+            return Optional.empty();
+        } else {
+            return Optional.of(new HashSet<>(strings));
+        }
+    }
+
+    private void sendToTransport(GetAttributeResponseMsg responseMsg, UUID sessionId, SessionInfoProto sessionInfo) {
+        DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
+                .setSessionIdMSB(sessionId.getMostSignificantBits())
+                .setSessionIdLSB(sessionId.getLeastSignificantBits())
+                .setGetAttributesResponse(responseMsg).build();
+        systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg);
+    }
+
+    private List<TsKvProto> toTsKvProtos(@Nullable List<AttributeKvEntry> result) {
+        List<TsKvProto> clientAttributes;
+        if (result == null || result.isEmpty()) {
+            clientAttributes = Collections.emptyList();
+        } else {
+            clientAttributes = new ArrayList<>(result.size());
+            for (AttributeKvEntry attrEntry : result) {
+                clientAttributes.add(toTsKvProto(attrEntry));
+            }
+        }
+        return clientAttributes;
+    }
+
+    private TsKvProto toTsKvProto(AttributeKvEntry attrEntry) {
+        return TsKvProto.newBuilder().setTs(attrEntry.getLastUpdateTs())
+                .setKv(toKeyValueProto(attrEntry)).build();
+    }
+
+    private KeyValueProto toKeyValueProto(KvEntry kvEntry) {
+        KeyValueProto.Builder builder = KeyValueProto.newBuilder();
+        builder.setKey(kvEntry.getKey());
+        switch (kvEntry.getDataType()) {
+            case BOOLEAN:
+                builder.setType(KeyValueType.BOOLEAN_V);
+                builder.setBoolV(kvEntry.getBooleanValue().get());
+                break;
+            case DOUBLE:
+                builder.setType(KeyValueType.DOUBLE_V);
+                builder.setDoubleV(kvEntry.getDoubleValue().get());
+                break;
+            case LONG:
+                builder.setType(KeyValueType.LONG_V);
+                builder.setLongV(kvEntry.getLongValue().get());
+                break;
+            case STRING:
+                builder.setType(KeyValueType.STRING_V);
+                builder.setStringV(kvEntry.getStrValue().get());
+                break;
+        }
+        return builder.build();
+    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
index 9faaade..32cb60d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index fe02335..3da90d1 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -25,7 +25,6 @@ import java.util.Optional;
 
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg;
-import org.thingsboard.server.actors.device.RuleEngineQueuePutAckMsg;
 import org.thingsboard.server.actors.service.DefaultActorService;
 import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
 import org.thingsboard.server.common.data.EntityType;
@@ -90,26 +89,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
                 nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
             }
             initRoutes(ruleChain, ruleNodeList);
-            reprocess(ruleNodeList);
             started = true;
         } else {
             onUpdate(context);
         }
     }
 
-    private void reprocess(List<RuleNode> ruleNodeList) {
-        for (RuleNode ruleNode : ruleNodeList) {
-            for (TbMsg tbMsg : queue.findUnprocessed(tenantId, ruleNode.getId().getId(), systemContext.getQueuePartitionId())) {
-                pushMsgToNode(nodeActors.get(ruleNode.getId()), tbMsg, "");
-            }
-        }
-        if (firstNode != null) {
-            for (TbMsg tbMsg : queue.findUnprocessed(tenantId, entityId.getId(), systemContext.getQueuePartitionId())) {
-                pushMsgToNode(firstNode, tbMsg, "");
-            }
-        }
-    }
-
     @Override
     public void onUpdate(ActorContext context) throws Exception {
         RuleChain ruleChain = service.findRuleChainById(entityId);
@@ -134,7 +119,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
         });
 
         initRoutes(ruleChain, ruleNodeList);
-        reprocess(ruleNodeList);
     }
 
     @Override
@@ -188,17 +172,14 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
     void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
         checkActive();
         if (firstNode != null) {
-            putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> pushMsgToNode(firstNode, msg, ""));
+            pushMsgToNode(firstNode, enrichWithRuleChainId(envelope.getTbMsg()), "");
         }
     }
 
     void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg envelope) {
         checkActive();
         if (firstNode != null) {
-            putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> {
-                pushMsgToNode(firstNode, msg, "");
-                envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(msg.getId()), self);
-            });
+            pushMsgToNode(firstNode, enrichWithRuleChainId(envelope.getTbMsg()), "");
         }
     }
 
@@ -206,15 +187,16 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
         checkActive();
         if (envelope.isEnqueue()) {
             if (firstNode != null) {
-                putToQueue(enrichWithRuleChainId(envelope.getMsg()), msg -> pushMsgToNode(firstNode, msg, envelope.getFromRelationType()));
+                pushMsgToNode(firstNode, enrichWithRuleChainId(envelope.getMsg()), envelope.getFromRelationType());
             }
         } else {
             if (firstNode != null) {
                 pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType());
             } else {
-                TbMsg msg = envelope.getMsg();
-                EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
-                queue.ack(tenantId, envelope.getMsg(), ackId.getId(), msg.getClusterPartition());
+//                TODO: Ack this message in Kafka
+//                TbMsg msg = envelope.getMsg();
+//                EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
+//                queue.ack(tenantId, envelope.getMsg(), ackId.getId(), msg.getClusterPartition());
             }
         }
     }
@@ -249,7 +231,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
         EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
         if (relationsCount == 0) {
             if (ackId != null) {
-                queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
+//                TODO: Ack this message in Kafka
+//                queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
             }
         } else if (relationsCount == 1) {
             for (RuleNodeRelation relation : relations) {
@@ -269,7 +252,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
             }
             //TODO: Ideally this should happen in async way when all targets confirm that the copied messages are successfully written to corresponding target queues.
             if (ackId != null) {
-                queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
+//                TODO: Ack this message in Kafka
+//                queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
             }
         }
     }
@@ -296,7 +280,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
         RuleNodeId targetId = new RuleNodeId(target.getId());
         RuleNodeCtx targetNodeCtx = nodeActors.get(targetId);
         TbMsg copy = msg.copy(UUIDs.timeBased(), entityId, targetId, DEFAULT_CLUSTER_PARTITION);
-        putToQueue(copy, queuedMsg -> pushMsgToNode(targetNodeCtx, queuedMsg, fromRelationType));
+        pushMsgToNode(targetNodeCtx, copy, fromRelationType);
     }
 
     private void pushToTarget(TbMsg msg, EntityId target, String fromRelationType) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index 1e9e23b..b70d1ed 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -30,7 +30,6 @@ import org.thingsboard.server.actors.app.AppActor;
 import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
 import org.thingsboard.server.actors.rpc.RpcManagerActor;
 import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
-import org.thingsboard.server.actors.session.SessionManagerActor;
 import org.thingsboard.server.actors.stats.StatsActor;
 import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.id.DeviceId;
@@ -90,8 +89,6 @@ public class DefaultActorService implements ActorService {
 
     private ActorRef appActor;
 
-    private ActorRef sessionManagerActor;
-
     private ActorRef rpcManagerActor;
 
     @PostConstruct
@@ -104,10 +101,6 @@ public class DefaultActorService implements ActorService {
         appActor = system.actorOf(Props.create(new AppActor.ActorCreator(actorContext)).withDispatcher(APP_DISPATCHER_NAME), "appActor");
         actorContext.setAppActor(appActor);
 
-        sessionManagerActor = system.actorOf(Props.create(new SessionManagerActor.ActorCreator(actorContext)).withDispatcher(CORE_DISPATCHER_NAME),
-                "sessionManagerActor");
-        actorContext.setSessionManagerActor(sessionManagerActor);
-
         rpcManagerActor = system.actorOf(Props.create(new RpcManagerActor.ActorCreator(actorContext)).withDispatcher(CORE_DISPATCHER_NAME),
                 "rpcManagerActor");
 
@@ -135,12 +128,6 @@ public class DefaultActorService implements ActorService {
     }
 
     @Override
-    public void process(SessionAwareMsg msg) {
-        log.debug("Processing session aware msg: {}", msg);
-        sessionManagerActor.tell(msg, ActorRef.noSender());
-    }
-
-    @Override
     public void onServerAdded(ServerInstance server) {
         log.trace("Processing onServerAdded msg: {}", server);
         broadcast(new ClusterEventMsg(server.getServerAddress(), true));
@@ -194,7 +181,6 @@ public class DefaultActorService implements ActorService {
 
     private void broadcast(ClusterEventMsg msg) {
         this.appActor.tell(msg, ActorRef.noSender());
-        this.sessionManagerActor.tell(msg, ActorRef.noSender());
         this.rpcManagerActor.tell(msg, ActorRef.noSender());
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
index 1cf8339..3441abc 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
@@ -35,14 +35,12 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
 
     protected final TenantId tenantId;
     protected final T entityId;
-    protected final MsgQueueService queue;
     protected ComponentLifecycleState state;
 
     protected ComponentMsgProcessor(ActorSystemContext systemContext, LoggingAdapter logger, TenantId tenantId, T id) {
         super(systemContext, logger);
         this.tenantId = tenantId;
         this.entityId = id;
-        this.queue = systemContext.getMsgQueueService();
     }
 
     public abstract void start(ActorContext context) throws Exception;
@@ -86,18 +84,4 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
         }
     }
 
-    protected void putToQueue(final TbMsg tbMsg, final Consumer<TbMsg> onSuccess) {
-        EntityId entityId = tbMsg.getRuleNodeId() != null ? tbMsg.getRuleNodeId() : tbMsg.getRuleChainId();
-        Futures.addCallback(queue.put(this.tenantId, tbMsg, entityId.getId(), tbMsg.getClusterPartition()), new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(@Nullable Void result) {
-                onSuccess.accept(tbMsg);
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                logger.debug("Failed to push message [{}] to queue due to [{}]", tbMsg, t);
-            }
-        });
-    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java b/application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java
index 3ee1858..570082d 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.service.transport.msg;
 
 import lombok.Data;
@@ -7,7 +22,6 @@ import org.thingsboard.server.common.msg.MsgType;
 import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
 import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
-import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
 
 import java.io.Serializable;
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
index 6b82792..aa36b9b 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.service.transport;
 
 import akka.actor.ActorRef;
@@ -30,6 +45,7 @@ import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import java.time.Duration;
 import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.function.Consumer;
@@ -136,6 +152,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
     @Override
     public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) {
         notificationsProducer.send(notificationsTopic + "." + nodeId,
+                new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB()).toString(),
                 ToTransportMsg.newBuilder().setToDeviceSessionMsg(msg).build()
                 , new QueueCallbackAdaptor(onSuccess, onFailure));
     }
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RuleEngineTransportService.java
index e6b3dd3..8ca484f 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RuleEngineTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RuleEngineTransportService.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,7 +16,6 @@
 package org.thingsboard.server.service.transport;
 
 import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
-import org.thingsboard.server.gen.transport.TransportProtos.;
 
 import java.util.function.Consumer;
 
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/ToTransportMsgEncoder.java b/application/src/main/java/org/thingsboard/server/service/transport/ToTransportMsgEncoder.java
index 5f3c026..bb0f5b6 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/ToTransportMsgEncoder.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/ToTransportMsgEncoder.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index dfd8f98..24758b5 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -91,8 +91,6 @@ public enum MsgType {
 
     DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG,
 
-    DEVICE_ACTOR_QUEUE_TIMEOUT_MSG,
-
     /**
      * Message that is sent from the Device Actor to Rule Engine. Requires acknowledgement
      */
@@ -101,7 +99,6 @@ public enum MsgType {
     /**
      * Message that is sent from Rule Engine to the Device Actor when message is successfully pushed to queue.
      */
-    RULE_ENGINE_QUEUE_PUT_ACK_MSG,
     ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG,
     TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG,
     SESSION_TIMEOUT_MSG,
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
index 90f15a4..86be3a3 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
index 8f4c095..610a490 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -106,6 +106,10 @@ public class TBKafkaProducerTemplate<T> {
         return send(topic, key, value, null, headers, callback);
     }
 
+    public Future<RecordMetadata> send(String topic, String key, T value, Callback callback) {
+        return send(topic, key, value, null, null, callback);
+    }
+
     public Future<RecordMetadata> send(String topic, String key, T value, Long timestamp, Iterable<Header> headers, Callback callback) {
         byte[] data = encoder.encode(value);
         ProducerRecord<String, byte[]> record;
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
index ba08bf8..77ad033 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
@@ -160,7 +160,7 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
         SettableFuture<Response> future = SettableFuture.create();
         pendingRequests.putIfAbsent(requestId, new ResponseMetaData<>(tickTs + maxRequestTimeout, future));
         request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId);
-        requestTemplate.send(key, request, headers);
+        requestTemplate.send(key, request, headers, null);
         return future;
     }
 
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
index 6f9e8a7..4c23ac2 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
index e7c1734..6bb8eff 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
@@ -15,18 +15,38 @@
  */
 package org.thingsboard.server.common.transport.adaptor;
 
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSyntaxException;
+import org.thingsboard.server.common.data.kv.AttributeKey;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
+import org.thingsboard.server.common.data.kv.BooleanDataEntry;
+import org.thingsboard.server.common.data.kv.DoubleDataEntry;
+import org.thingsboard.server.common.data.kv.KvEntry;
+import org.thingsboard.server.common.data.kv.LongDataEntry;
+import org.thingsboard.server.common.data.kv.StringDataEntry;
+import org.thingsboard.server.common.msg.core.AttributesUpdateRequest;
+import org.thingsboard.server.common.msg.core.BasicAttributesUpdateRequest;
+import org.thingsboard.server.common.msg.core.BasicRequest;
+import org.thingsboard.server.common.msg.core.BasicTelemetryUploadRequest;
+import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
+import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
+import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;
+import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
+import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.*;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
-import com.google.gson.*;
-import org.thingsboard.server.common.msg.core.*;
-
-import org.thingsboard.server.common.data.kv.*;
-import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
-
 public class JsonConverter {
 
     private static final Gson GSON = new Gson();
@@ -44,6 +64,109 @@ public class JsonConverter {
         return convertToTelemetry(jsonObject, System.currentTimeMillis(), requestId);
     }
 
+    public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonObject) throws JsonSyntaxException {
+        long systemTs = System.currentTimeMillis();
+        PostTelemetryMsg.Builder builder = PostTelemetryMsg.newBuilder();
+        if (jsonObject.isJsonObject()) {
+            parseObject(builder, systemTs, jsonObject);
+        } else if (jsonObject.isJsonArray()) {
+            jsonObject.getAsJsonArray().forEach(je -> {
+                if (je.isJsonObject()) {
+                    parseObject(builder, systemTs, je.getAsJsonObject());
+                } else {
+                    throw new JsonSyntaxException(CAN_T_PARSE_VALUE + je);
+                }
+            });
+        } else {
+            throw new JsonSyntaxException(CAN_T_PARSE_VALUE + jsonObject);
+        }
+        return builder.build();
+    }
+
+    public static PostAttributeMsg convertToAttributesProto(JsonElement jsonObject) throws JsonSyntaxException {
+        if (jsonObject.isJsonObject()) {
+            PostAttributeMsg.Builder result = PostAttributeMsg.newBuilder();
+            List<KeyValueProto> keyValueList = parseProtoValues(jsonObject.getAsJsonObject());
+            result.addAllKv(keyValueList);
+            return result.build();
+        } else {
+            throw new JsonSyntaxException(CAN_T_PARSE_VALUE + jsonObject);
+        }
+    }
+
+
+    private static void parseObject(PostTelemetryMsg.Builder builder, long systemTs, JsonElement jsonObject) {
+        JsonObject jo = jsonObject.getAsJsonObject();
+        if (jo.has("ts") && jo.has("values")) {
+            parseWithTs(builder, jo);
+        } else {
+            parseWithoutTs(builder, systemTs, jo);
+        }
+    }
+
+    private static void parseWithoutTs(PostTelemetryMsg.Builder request, long systemTs, JsonObject jo) {
+        TsKvListProto.Builder builder = TsKvListProto.newBuilder();
+        builder.setTs(systemTs);
+        builder.addAllKv(parseProtoValues(jo));
+        request.addTsKvList(builder.build());
+    }
+
+    public static void parseWithTs(PostTelemetryMsg.Builder request, JsonObject jo) {
+        TsKvListProto.Builder builder = TsKvListProto.newBuilder();
+        builder.setTs(jo.get("ts").getAsLong());
+        builder.addAllKv(parseProtoValues(jo.get("values").getAsJsonObject()));
+        request.addTsKvList(builder.build());
+    }
+
+    public static List<KeyValueProto> parseProtoValues(JsonObject valuesObject) {
+        List<KeyValueProto> result = new ArrayList<>();
+        for (Entry<String, JsonElement> valueEntry : valuesObject.entrySet()) {
+            JsonElement element = valueEntry.getValue();
+            if (element.isJsonPrimitive()) {
+                JsonPrimitive value = element.getAsJsonPrimitive();
+                if (value.isString()) {
+                    result.add(KeyValueProto.newBuilder().setKey(valueEntry.getKey()).setType(KeyValueType.STRING_V)
+                            .setStringV(value.getAsString()).build());
+                } else if (value.isBoolean()) {
+                    result.add(KeyValueProto.newBuilder().setKey(valueEntry.getKey()).setType(KeyValueType.BOOLEAN_V)
+                            .setBoolV(value.getAsBoolean()).build());
+                } else if (value.isNumber()) {
+                    if (value.getAsString().contains(".")) {
+                        result.add(KeyValueProto.newBuilder().setKey(valueEntry.getKey()).setType(KeyValueType.DOUBLE_V)
+                                .setDoubleV(value.getAsDouble()).build());
+                    } else {
+                        try {
+                            long longValue = Long.parseLong(value.getAsString());
+                            result.add(KeyValueProto.newBuilder().setKey(valueEntry.getKey()).setType(KeyValueType.LONG_V)
+                                    .setLongV(longValue).build());
+                        } catch (NumberFormatException e) {
+                            throw new JsonSyntaxException("Big integer values are not supported!");
+                        }
+                    }
+                } else {
+                    throw new JsonSyntaxException(CAN_T_PARSE_VALUE + value);
+                }
+            } else {
+                throw new JsonSyntaxException(CAN_T_PARSE_VALUE + element);
+            }
+        }
+        return result;
+    }
+
+    private static void parseNumericProto(List<KvEntry> result, Entry<String, JsonElement> valueEntry, JsonPrimitive value) {
+        if (value.getAsString().contains(".")) {
+            result.add(new DoubleDataEntry(valueEntry.getKey(), value.getAsDouble()));
+        } else {
+            try {
+                long longValue = Long.parseLong(value.getAsString());
+                result.add(new LongDataEntry(valueEntry.getKey(), longValue));
+            } catch (NumberFormatException e) {
+                throw new JsonSyntaxException("Big integer values are not supported!");
+            }
+        }
+    }
+
+
     private static TelemetryUploadRequest convertToTelemetry(JsonElement jsonObject, long systemTs, int requestId) throws JsonSyntaxException {
         BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId);
         if (jsonObject.isJsonObject()) {
@@ -140,6 +263,26 @@ public class JsonConverter {
         }
     }
 
+    public static JsonObject toJson(GetAttributeResponseMsg payload) {
+        JsonObject result = new JsonObject();
+        if (payload.getClientAttributeListCount() > 0) {
+            JsonObject attrObject = new JsonObject();
+            payload.getClientAttributeListList().forEach(addToObjectFromProto(attrObject));
+            result.add("client", attrObject);
+        }
+        if (payload.getSharedAttributeListCount() > 0) {
+            JsonObject attrObject = new JsonObject();
+            payload.getSharedAttributeListList().forEach(addToObjectFromProto(attrObject));
+            result.add("shared", attrObject);
+        }
+        if (payload.getDeletedAttributeKeysCount() > 0) {
+            JsonArray attrObject = new JsonArray();
+            payload.getDeletedAttributeKeysList().forEach(attrObject::add);
+            result.add("deleted", attrObject);
+        }
+        return result;
+    }
+
     public static JsonObject toJson(AttributesKVMsg payload, boolean asMap) {
         JsonObject result = new JsonObject();
         if (asMap) {
@@ -166,8 +309,29 @@ public class JsonConverter {
     }
 
     private static Consumer<AttributeKey> addToObject(JsonArray result) {
-        return key -> {
-            result.add(key.getAttributeKey());
+        return key -> result.add(key.getAttributeKey());
+    }
+
+    private static Consumer<TsKvProto> addToObjectFromProto(JsonObject result) {
+        return de -> {
+            JsonPrimitive value;
+            switch (de.getKv().getType()) {
+                case BOOLEAN_V:
+                    value = new JsonPrimitive(de.getKv().getBoolV());
+                    break;
+                case DOUBLE_V:
+                    value = new JsonPrimitive(de.getKv().getDoubleV());
+                    break;
+                case LONG_V:
+                    value = new JsonPrimitive(de.getKv().getLongV());
+                    break;
+                case STRING_V:
+                    value = new JsonPrimitive(de.getKv().getStringV());
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unsupported data type: " + de.getKv().getType());
+            }
+            result.add(de.getKv().getKey(), value);
         };
     }
 
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
index 3068974..b46cfb8 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
index d0e6d18..44fcd2a 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgProcessor.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgProcessor.java
index 4cff643..80b2690 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgProcessor.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgProcessor.java
@@ -20,8 +20,6 @@ import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
 
 public interface SessionMsgProcessor {
 
-    void process(SessionAwareMsg msg);
-
     void onDeviceAdded(Device device);
 
 }
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportAdaptor.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportAdaptor.java
index 080f874..86067d5 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportAdaptor.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportAdaptor.java
@@ -20,6 +20,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
 import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
 import org.thingsboard.server.common.msg.session.SessionContext;
 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
+import org.thingsboard.server.gen.transport.TransportProtos;
 
 import java.util.Optional;
 
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index dc2e306..c3817a9 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
 
 /**
  * Created by ashvayka on 04.10.18.
@@ -40,6 +41,8 @@ public interface TransportService {
 
     void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback);
 
+    void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback);
+
     void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
 
     void deregisterSession(SessionInfoProto sessionInfo);
diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto
index f3b3ae8..d32bac9 100644
--- a/common/transport/src/main/proto/transport.proto
+++ b/common/transport/src/main/proto/transport.proto
@@ -23,12 +23,13 @@ option java_outer_classname = "TransportProtos";
  * Data Structures;
  */
 message SessionInfoProto {
-  int64 sessionIdMSB = 1;
-  int64 sessionIdLSB = 2;
-  int64 tenantIdMSB = 3;
-  int64 tenantIdLSB = 4;
-  int64 deviceIdMSB = 5;
-  int64 deviceIdLSB = 6;
+  string nodeId = 1;
+  int64 sessionIdMSB = 2;
+  int64 sessionIdLSB = 3;
+  int64 tenantIdMSB = 4;
+  int64 tenantIdLSB = 5;
+  int64 deviceIdMSB = 6;
+  int64 deviceIdLSB = 7;
 }
 
 enum SessionEvent {
@@ -57,6 +58,11 @@ message KeyValueProto {
   string string_v = 6;
 }
 
+message TsKvProto {
+  int64 ts = 1;
+  KeyValueProto kv = 2;
+}
+
 message TsKvListProto {
   int64 ts = 1;
   repeated KeyValueProto kv = 2;
@@ -76,9 +82,8 @@ message DeviceInfoProto {
  * Messages that use Data Structures;
  */
 message SessionEventMsg {
-  string nodeId = 1;
-  SessionType sessionType = 2;
-  SessionEvent event = 3;
+  SessionType sessionType = 1;
+  SessionEvent event = 2;
 }
 
 message PostTelemetryMsg {
@@ -90,14 +95,17 @@ message PostAttributeMsg {
 }
 
 message GetAttributeRequestMsg {
-  repeated string clientAttributeNames = 1;
-  repeated string sharedAttributeNames = 2;
+  int32 requestId = 1;
+  repeated string clientAttributeNames = 2;
+  repeated string sharedAttributeNames = 3;
 }
 
 message GetAttributeResponseMsg {
-  repeated TsKvListProto clientAttributeList = 1;
-  repeated TsKvListProto sharedAttributeList = 2;
-  repeated string deletedAttributeKeys = 3;
+  int32 requestId = 1;
+  repeated TsKvProto clientAttributeList = 2;
+  repeated TsKvProto sharedAttributeList = 3;
+  repeated string deletedAttributeKeys = 4;
+  string error = 5;
 }
 
 message ValidateDeviceTokenRequestMsg {
diff --git a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
index e2fd7cd..ec69cec 100644
--- a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
+++ b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
@@ -106,30 +106,30 @@ public class CoapServerTest {
         public static SessionMsgProcessor sessionMsgProcessor() {
             return new SessionMsgProcessor() {
 
-                @Override
-                public void process(SessionAwareMsg toActorMsg) {
-                    if (toActorMsg instanceof TransportToDeviceSessionActorMsg) {
-                        AdaptorToSessionActorMsg sessionMsg = ((TransportToDeviceSessionActorMsg) toActorMsg).getSessionMsg();
-                        try {
-                            FromDeviceMsg deviceMsg = sessionMsg.getMsg();
-                            ToDeviceMsg toDeviceMsg = null;
-                            if (deviceMsg.getMsgType() == SessionMsgType.POST_TELEMETRY_REQUEST) {
-                                toDeviceMsg = BasicStatusCodeResponse.onSuccess(deviceMsg.getMsgType(), BasicRequest.DEFAULT_REQUEST_ID);
-                            } else if (deviceMsg.getMsgType() == SessionMsgType.GET_ATTRIBUTES_REQUEST) {
-                                List<AttributeKvEntry> data = new ArrayList<>();
-                                data.add(new BaseAttributeKvEntry(new StringDataEntry("key1", "value1"), System.currentTimeMillis()));
-                                data.add(new BaseAttributeKvEntry(new LongDataEntry("key2", 42L), System.currentTimeMillis()));
-                                BasicAttributeKVMsg kv = BasicAttributeKVMsg.fromClient(data);
-                                toDeviceMsg = BasicGetAttributesResponse.onSuccess(deviceMsg.getMsgType(), BasicRequest.DEFAULT_REQUEST_ID, kv);
-                            }
-                            if (toDeviceMsg != null) {
-                                sessionMsg.getSessionContext().onMsg(new BasicSessionActorToAdaptorMsg(sessionMsg.getSessionContext(), toDeviceMsg));
-                            }
-                        } catch (Exception e) {
-                            e.printStackTrace();
-                        }
-                    }
-                }
+//                @Override
+//                public void process(SessionAwareMsg toActorMsg) {
+//                    if (toActorMsg instanceof TransportToDeviceSessionActorMsg) {
+//                        AdaptorToSessionActorMsg sessionMsg = ((TransportToDeviceSessionActorMsg) toActorMsg).getSessionMsg();
+//                        try {
+//                            FromDeviceMsg deviceMsg = sessionMsg.getMsg();
+//                            ToDeviceMsg toDeviceMsg = null;
+//                            if (deviceMsg.getMsgType() == SessionMsgType.POST_TELEMETRY_REQUEST) {
+//                                toDeviceMsg = BasicStatusCodeResponse.onSuccess(deviceMsg.getMsgType(), BasicRequest.DEFAULT_REQUEST_ID);
+//                            } else if (deviceMsg.getMsgType() == SessionMsgType.GET_ATTRIBUTES_REQUEST) {
+//                                List<AttributeKvEntry> data = new ArrayList<>();
+//                                data.add(new BaseAttributeKvEntry(new StringDataEntry("key1", "value1"), System.currentTimeMillis()));
+//                                data.add(new BaseAttributeKvEntry(new LongDataEntry("key2", 42L), System.currentTimeMillis()));
+//                                BasicAttributeKVMsg kv = BasicAttributeKVMsg.fromClient(data);
+//                                toDeviceMsg = BasicGetAttributesResponse.onSuccess(deviceMsg.getMsgType(), BasicRequest.DEFAULT_REQUEST_ID, kv);
+//                            }
+//                            if (toDeviceMsg != null) {
+//                                sessionMsg.getSessionContext().onMsg(new BasicSessionActorToAdaptorMsg(sessionMsg.getSessionContext(), toDeviceMsg));
+//                            }
+//                        } catch (Exception e) {
+//                            e.printStackTrace();
+//                        }
+//                    }
+//                }
 
                 @Override
                 public void onDeviceAdded(Device device) {
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
index c8baaf7..ee41d2c 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.transport.mqtt.adaptors;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import com.google.gson.JsonSyntaxException;
 import io.netty.buffer.ByteBuf;
@@ -25,12 +26,14 @@ import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.handler.codec.mqtt.*;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
 import org.thingsboard.server.common.data.id.SessionId;
 import org.thingsboard.server.common.msg.core.*;
 import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
 import org.thingsboard.server.common.msg.session.*;
 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
+import org.thingsboard.server.gen.transport.TransportProtos;
 import org.thingsboard.server.transport.mqtt.MqttTopics;
 import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
 import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
@@ -53,6 +56,64 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
     private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
 
     @Override
+    public TransportProtos.PostTelemetryMsg convertToPostTelemetry(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
+        String payload = validatePayload(ctx.getSessionId(), inbound.payload());
+        try {
+            return JsonConverter.convertToTelemetryProto(new JsonParser().parse(payload));
+        } catch (IllegalStateException | JsonSyntaxException ex) {
+            throw new AdaptorException(ex);
+        }
+    }
+
+    @Override
+    public TransportProtos.PostAttributeMsg convertToPostAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
+        String payload = validatePayload(ctx.getSessionId(), inbound.payload());
+        try {
+            return JsonConverter.convertToAttributesProto(new JsonParser().parse(payload));
+        } catch (IllegalStateException | JsonSyntaxException ex) {
+            throw new AdaptorException(ex);
+        }
+    }
+
+    @Override
+    public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
+        String topicName = inbound.variableHeader().topicName();
+        try {
+            TransportProtos.GetAttributeRequestMsg.Builder result = TransportProtos.GetAttributeRequestMsg.newBuilder();
+            result.setRequestId(Integer.valueOf(topicName.substring(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX.length())));
+            String payload = inbound.payload().toString(UTF8);
+            JsonElement requestBody = new JsonParser().parse(payload);
+            Set<String> clientKeys = toStringSet(requestBody, "clientKeys");
+            Set<String> sharedKeys = toStringSet(requestBody, "sharedKeys");
+            if (clientKeys != null) {
+                result.addAllClientAttributeNames(clientKeys);
+            }
+            if (sharedKeys != null) {
+                result.addAllSharedAttributeNames(sharedKeys);
+            }
+            return result.build();
+        } catch (RuntimeException e) {
+            log.warn("Failed to decode get attributes request", e);
+            throw new AdaptorException(e);
+        }
+    }
+
+    @Override
+    public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
+        if (!StringUtils.isEmpty(responseMsg.getError())) {
+            throw new AdaptorException(responseMsg.getError());
+        } else {
+            Integer requestId = responseMsg.getRequestId();
+            if (requestId >= 0) {
+                return Optional.of(createMqttPublishMsg(ctx,
+                        MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX + requestId,
+                        JsonConverter.toJson(responseMsg)));
+            }
+            return Optional.empty();
+        }
+    }
+
+    @Override
     public AdaptorToSessionActorMsg convertToActorMsg(DeviceSessionCtx ctx, SessionMsgType type, MqttMessage inbound) throws AdaptorException {
         FromDeviceMsg msg;
         switch (type) {
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
index bf83a1f..54e7c5c 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
@@ -16,11 +16,24 @@
 package org.thingsboard.server.transport.mqtt.adaptors;
 
 import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
 import org.thingsboard.server.common.transport.TransportAdaptor;
+import org.thingsboard.server.common.transport.adaptor.AdaptorException;
+import org.thingsboard.server.gen.transport.TransportProtos;
 import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
 
+import java.util.Optional;
+
 /**
  * @author Andrew Shvayka
  */
 public interface MqttTransportAdaptor extends TransportAdaptor<DeviceSessionCtx, MqttMessage, MqttMessage> {
+
+    TransportProtos.PostTelemetryMsg convertToPostTelemetry(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
+
+    TransportProtos.PostAttributeMsg convertToPostAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
+
+    TransportProtos.GetAttributeRequestMsg convertToGetAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
+
+    Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException;
 }
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index d8f1d10..c64ab5d 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,18 +26,25 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader;
 import io.netty.handler.codec.mqtt.MqttMessage;
 import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
 import io.netty.handler.codec.mqtt.MqttPubAckMessage;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
 import io.netty.handler.codec.mqtt.MqttQoS;
 import io.netty.handler.codec.mqtt.MqttSubAckMessage;
 import io.netty.handler.codec.mqtt.MqttSubAckPayload;
+import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.util.StringUtils;
+import org.thingsboard.server.common.transport.SessionMsgListener;
 import org.thingsboard.server.common.transport.TransportService;
 import org.thingsboard.server.common.transport.TransportServiceCallback;
+import org.thingsboard.server.common.transport.adaptor.AdaptorException;
 import org.thingsboard.server.common.transport.quota.QuotaService;
 import org.thingsboard.server.dao.EncryptionUtil;
+import org.thingsboard.server.gen.transport.TransportProtos;
 import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
 import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent;
 import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
@@ -54,6 +61,7 @@ import javax.net.ssl.SSLPeerUnverifiedException;
 import javax.security.cert.X509Certificate;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -65,14 +73,16 @@ import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUS
 import static io.netty.handler.codec.mqtt.MqttMessageType.CONNACK;
 import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK;
 import static io.netty.handler.codec.mqtt.MqttMessageType.SUBACK;
+import static io.netty.handler.codec.mqtt.MqttMessageType.UNSUBACK;
 import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
 import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
+import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE;
 
 /**
  * @author Andrew Shvayka
  */
 @Slf4j
-public class MqttTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>> {
+public class MqttTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>>, SessionMsgListener {
 
     public static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE;
 
@@ -84,8 +94,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
     private final SslHandler sslHandler;
     private final ConcurrentMap<String, Integer> mqttQoSMap;
 
-    private final SessionInfoProto sessionInfo;
-
+    private volatile SessionInfoProto sessionInfo;
     private volatile InetSocketAddress address;
     private volatile DeviceSessionCtx deviceSessionCtx;
     private volatile GatewaySessionCtx gatewaySessionCtx;
@@ -98,11 +107,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
         this.quotaService = context.getQuotaService();
         this.sslHandler = context.getSslHandler();
         this.mqttQoSMap = new ConcurrentHashMap<>();
-        this.sessionInfo = SessionInfoProto.newBuilder()
-                .setNodeId(context.getNodeId())
-                .setSessionIdMSB(sessionId.getMostSignificantBits())
-                .setSessionIdLSB(sessionId.getLeastSignificantBits())
-                .build();
         this.deviceSessionCtx = new DeviceSessionCtx(mqttQoSMap);
     }
 
@@ -135,15 +139,15 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
             case CONNECT:
                 processConnect(ctx, (MqttConnectMessage) msg);
                 break;
-//            case PUBLISH:
-//                processPublish(ctx, (MqttPublishMessage) msg);
-//                break;
-//            case SUBSCRIBE:
-//                processSubscribe(ctx, (MqttSubscribeMessage) msg);
-//                break;
-//            case UNSUBSCRIBE:
-//                processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
-//                break;
+            case PUBLISH:
+                processPublish(ctx, (MqttPublishMessage) msg);
+                break;
+            case SUBSCRIBE:
+                processSubscribe(ctx, (MqttSubscribeMessage) msg);
+                break;
+            case UNSUBSCRIBE:
+                processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
+                break;
 //            case PINGREQ:
 //                if (checkConnected(ctx)) {
 //                    ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
@@ -160,24 +164,25 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 
     }
 
-//    private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
-//        if (!checkConnected(ctx)) {
-//            return;
-//        }
-//        String topicName = mqttMsg.variableHeader().topicName();
-//        int msgId = mqttMsg.variableHeader().packetId();
-//        log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId);
-//
-//        if (topicName.startsWith(BASE_GATEWAY_API_TOPIC)) {
-//            if (gatewaySessionCtx != null) {
-//                gatewaySessionCtx.setChannel(ctx);
+    private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
+        if (!checkConnected(ctx)) {
+            return;
+        }
+        String topicName = mqttMsg.variableHeader().topicName();
+        int msgId = mqttMsg.variableHeader().packetId();
+        log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId);
+
+        if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
+            if (gatewaySessionCtx != null) {
+                gatewaySessionCtx.setChannel(ctx);
 //                handleMqttPublishMsg(topicName, msgId, mqttMsg);
-//            }
-//        } else {
-//            processDevicePublish(ctx, mqttMsg, topicName, msgId);
-//        }
-//    }
-//
+            }
+        } else {
+            processDevicePublish(ctx, mqttMsg, topicName, msgId);
+        }
+    }
+
+    //
 //    private void handleMqttPublishMsg(String topicName, int msgId, MqttPublishMessage mqttMsg) {
 //        try {
 //            switch (topicName) {
@@ -205,7 +210,23 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 //        }
 //    }
 //
-//    private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
+    private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
+        try {
+            if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_TOPIC)) {
+                TransportProtos.PostTelemetryMsg postTelemetryMsg = adaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
+                transportService.process(sessionInfo, postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
+            } else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_TOPIC)) {
+                TransportProtos.PostAttributeMsg postAttributeMsg = adaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
+                transportService.process(sessionInfo, postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
+            } else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
+                TransportProtos.GetAttributeRequestMsg getAttributeMsg = adaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg);
+                transportService.process(sessionInfo, getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
+            }
+        } catch (AdaptorException e) {
+            log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
+            log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
+            ctx.close();
+        }
 //        AdaptorToSessionActorMsg msg = null;
 //        try {
 //            if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) {
@@ -237,20 +258,38 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 //            log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
 //            ctx.close();
 //        }
-//    }
-//
-//    private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
-//        if (!checkConnected(ctx)) {
-//            return;
-//        }
-//        log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
-//        List<Integer> grantedQoSList = new ArrayList<>();
-//        for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
-//            String topic = subscription.topicName();
-//            MqttQoS reqQoS = subscription.qualityOfService();
-//            try {
-//                switch (topic) {
-//                    case DEVICE_ATTRIBUTES_TOPIC: {
+    }
+
+    private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final int msgId, final T msg) {
+        return new TransportServiceCallback<Void>() {
+            @Override
+            public void onSuccess(Void dummy) {
+                log.trace("[{}] Published msg: {}", sessionId, msg);
+                if (msgId > 0) {
+                    ctx.writeAndFlush(createMqttPubAckMsg(msgId));
+                }
+            }
+
+            @Override
+            public void onError(Throwable e) {
+                log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);
+                ctx.close();
+            }
+        };
+    }
+
+    private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
+        if (!checkConnected(ctx)) {
+            return;
+        }
+        log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
+        List<Integer> grantedQoSList = new ArrayList<>();
+        for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
+            String topic = subscription.topicName();
+            MqttQoS reqQoS = subscription.qualityOfService();
+            try {
+                switch (topic) {
+//                    case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
 //                        AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
 //                        processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
 //                        registerSubQoS(topic, grantedQoSList, reqQoS);
@@ -267,37 +306,37 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 //                    case GATEWAY_RPC_TOPIC:
 //                        registerSubQoS(topic, grantedQoSList, reqQoS);
 //                        break;
-//                    case DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
-//                        deviceSessionCtx.setAllowAttributeResponses();
-//                        registerSubQoS(topic, grantedQoSList, reqQoS);
-//                        break;
-//                    default:
-//                        log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
-//                        grantedQoSList.add(FAILURE.value());
-//                        break;
-//                }
-//            } catch (AdaptorException e) {
-//                log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
-//                grantedQoSList.add(FAILURE.value());
-//            }
-//        }
-//        ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
-//    }
-//
-//    private void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) {
-//        grantedQoSList.add(getMinSupportedQos(reqQoS));
-//        mqttQoSMap.put(topic, getMinSupportedQos(reqQoS));
-//    }
-//
-//    private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
-//        if (!checkConnected(ctx)) {
-//            return;
-//        }
-//        log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
-//        for (String topicName : mqttMsg.payload().topics()) {
-//            mqttQoSMap.remove(topicName);
-//            try {
-//                switch (topicName) {
+                    case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
+                        deviceSessionCtx.setAllowAttributeResponses();
+                        registerSubQoS(topic, grantedQoSList, reqQoS);
+                        break;
+                    default:
+                        log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
+                        grantedQoSList.add(FAILURE.value());
+                        break;
+                }
+            } catch (Exception e) {
+                log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
+                grantedQoSList.add(FAILURE.value());
+            }
+        }
+        ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
+    }
+
+    private void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) {
+        grantedQoSList.add(getMinSupportedQos(reqQoS));
+        mqttQoSMap.put(topic, getMinSupportedQos(reqQoS));
+    }
+
+    private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
+        if (!checkConnected(ctx)) {
+            return;
+        }
+        log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
+        for (String topicName : mqttMsg.payload().topics()) {
+            mqttQoSMap.remove(topicName);
+            try {
+                switch (topicName) {
 //                    case DEVICE_ATTRIBUTES_TOPIC: {
 //                        AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
 //                        processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
@@ -308,23 +347,23 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 //                        processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
 //                        break;
 //                    }
-//                    case DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
-//                        deviceSessionCtx.setDisallowAttributeResponses();
-//                        break;
-//                }
-//            } catch (AdaptorException e) {
-//                log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
-//            }
-//        }
-//        ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId()));
-//    }
-//
-//    private MqttMessage createUnSubAckMessage(int msgId) {
-//        MqttFixedHeader mqttFixedHeader =
-//                new MqttFixedHeader(UNSUBACK, false, AT_LEAST_ONCE, false, 0);
-//        MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);
-//        return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader);
-//    }
+                    case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
+                        deviceSessionCtx.setDisallowAttributeResponses();
+                        break;
+                }
+            } catch (Exception e) {
+                log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
+            }
+        }
+        ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId()));
+    }
+
+    private MqttMessage createUnSubAckMessage(int msgId) {
+        MqttFixedHeader mqttFixedHeader =
+                new MqttFixedHeader(UNSUBACK, false, AT_LEAST_ONCE, false, 0);
+        MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);
+        return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader);
+    }
 
     private void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
         log.info("[{}] Processing connect msg for client: {}!", sessionId, msg.payload().clientIdentifier());
@@ -346,15 +385,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
                     new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
                         @Override
                         public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
-                            if (!msg.hasDeviceInfo()) {
-                                ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
-                                ctx.close();
-                            } else {
-                                ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
-                                deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
-                                transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.OPEN), null);
-                                checkGatewaySession();
-                            }
+                            onValidateDeviceResponse(msg, ctx);
                         }
 
                         @Override
@@ -375,15 +406,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
                     new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
                         @Override
                         public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
-                            if (!msg.hasDeviceInfo()) {
-                                ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
-                                ctx.close();
-                            } else {
-                                ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
-                                deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
-                                transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.OPEN), null);
-                                checkGatewaySession();
-                            }
+                            onValidateDeviceResponse(msg, ctx);
                         }
 
                         @Override
@@ -415,7 +438,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
     private void processDisconnect(ChannelHandlerContext ctx) {
         ctx.close();
         if (deviceSessionCtx.isConnected()) {
-            transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.CLOSED), null);
+            transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.CLOSED), null);
+            transportService.deregisterSession(sessionInfo);
             if (gatewaySessionCtx != null) {
                 gatewaySessionCtx.onGatewayDisconnect();
             }
@@ -488,16 +512,46 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 
     private SessionEventMsg getSessionEventMsg(SessionEvent event) {
         return SessionEventMsg.newBuilder()
-                .setSessionInfo(sessionInfo)
-                .setDeviceIdMSB(deviceSessionCtx.getDeviceIdMSB())
-                .setDeviceIdLSB(deviceSessionCtx.getDeviceIdLSB())
+                .setSessionType(TransportProtos.SessionType.ASYNC)
                 .setEvent(event).build();
     }
 
     @Override
     public void operationComplete(Future<? super Void> future) throws Exception {
         if (deviceSessionCtx.isConnected()) {
-            transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.CLOSED), null);
+            transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.CLOSED), null);
+            transportService.deregisterSession(sessionInfo);
+        }
+    }
+
+    private void onValidateDeviceResponse(ValidateDeviceCredentialsResponseMsg msg, ChannelHandlerContext ctx) {
+        if (!msg.hasDeviceInfo()) {
+            ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
+            ctx.close();
+        } else {
+            ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
+            deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
+            sessionInfo = SessionInfoProto.newBuilder()
+                    .setNodeId(context.getNodeId())
+                    .setSessionIdMSB(sessionId.getMostSignificantBits())
+                    .setSessionIdLSB(sessionId.getLeastSignificantBits())
+                    .setDeviceIdMSB(msg.getDeviceInfo().getDeviceIdMSB())
+                    .setDeviceIdLSB(msg.getDeviceInfo().getDeviceIdLSB())
+                    .setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB())
+                    .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB())
+                    .build();
+            transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.OPEN), null);
+            transportService.registerSession(sessionInfo, this);
+            checkGatewaySession();
+        }
+    }
+
+    @Override
+    public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {
+        try {
+            adaptor.convertToPublish(deviceSessionCtx, response).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
+        } catch (Exception e) {
+            log.trace("[{}] Failed to convert device attributes to MQTT msg", sessionId, e);
         }
     }
 }
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
index 6661cb1..d704634 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.transport.mqtt.session;
 
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.mqtt.*;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.common.data.id.SessionId;
 import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
@@ -41,12 +42,13 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
 
     private final MqttSessionId sessionId;
+    @Getter
     private ChannelHandlerContext channel;
     private volatile boolean allowAttributeResponses;
     private AtomicInteger msgIdSeq = new AtomicInteger(0);
 
     public DeviceSessionCtx(ConcurrentMap<String, Integer> mqttQoSMap) {
-        super(null, null, null);
+        super(null, null, mqttQoSMap);
         this.sessionId = new MqttSessionId();
     }
 
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
index 76c1c1b..7a2f698 100644
--- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -111,8 +111,8 @@ public class MqttTransportService implements TransportService {
         TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaConsumerTemplate.builder();
         responseBuilder.settings(kafkaSettings);
         responseBuilder.topic(transportApiResponsesTopic + "." + transportContext.getNodeId());
-        responseBuilder.clientId(transportContext.getNodeId());
-        responseBuilder.groupId(null);
+        responseBuilder.clientId("transport-api-client-" + transportContext.getNodeId());
+        responseBuilder.groupId("transport-api-client");
         responseBuilder.autoCommit(true);
         responseBuilder.autoCommitIntervalMs(autoCommitInterval);
         responseBuilder.decoder(new TransportApiResponseDecoder());
@@ -137,8 +137,8 @@ public class MqttTransportService implements TransportService {
         TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<ToTransportMsg> mainConsumerBuilder = TBKafkaConsumerTemplate.builder();
         mainConsumerBuilder.settings(kafkaSettings);
         mainConsumerBuilder.topic(notificationsTopic + "." + transportContext.getNodeId());
-        mainConsumerBuilder.clientId(transportContext.getNodeId());
-        mainConsumerBuilder.groupId(null);
+        mainConsumerBuilder.clientId("transport-" + transportContext.getNodeId());
+        mainConsumerBuilder.groupId("transport");
         mainConsumerBuilder.autoCommit(true);
         mainConsumerBuilder.autoCommitIntervalMs(notificationsAutoCommitInterval);
         mainConsumerBuilder.decoder(new ToTransportMsgResponseDecoder());
@@ -243,6 +243,15 @@ public class MqttTransportService implements TransportService {
     }
 
     @Override
+    public void process(SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
+        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                        .setGetAttributes(msg).build()
+        ).build();
+        send(sessionInfo, toRuleEngineMsg, callback);
+    }
+
+    @Override
     public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {
         sessions.putIfAbsent(toId(sessionInfo), listener);
         //TODO: monitor sessions periodically: PING REQ/RESP, etc.
@@ -271,9 +280,13 @@ public class MqttTransportService implements TransportService {
         @Override
         public void onCompletion(RecordMetadata metadata, Exception exception) {
             if (exception == null) {
-                callback.onSuccess(null);
+                if (callback != null) {
+                    callback.onSuccess(null);
+                }
             } else {
-                callback.onError(exception);
+                if (callback != null) {
+                    callback.onError(exception);
+                }
             }
         }
     }
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToRuleEngineMsgEncoder.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToRuleEngineMsgEncoder.java
index ee929ce..179a2a1 100644
--- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToRuleEngineMsgEncoder.java
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToRuleEngineMsgEncoder.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.