thingsboard-developers

WIP: Implementation

10/9/2018 8:16:08 AM

Changes

application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java 122(+0 -122)

application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java 156(+0 -156)

application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java 143(+0 -143)

application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java 180(+0 -180)

application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java 95(+0 -95)

common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java 107(+0 -107)

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 8930f84..8ff61ac 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -31,6 +31,7 @@ import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Component;
 import org.thingsboard.rule.engine.api.MailService;
 import org.thingsboard.server.actors.service.ActorService;
@@ -69,6 +70,7 @@ import org.thingsboard.server.service.script.JsExecutorService;
 import org.thingsboard.server.service.script.JsInvokeService;
 import org.thingsboard.server.service.state.DeviceStateService;
 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
+import org.thingsboard.server.service.transport.RuleEngineTransportService;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
@@ -204,6 +206,11 @@ public class ActorSystemContext {
     @Getter
     private DeviceStateService deviceStateService;
 
+    @Lazy
+    @Autowired
+    @Getter
+    private RuleEngineTransportService ruleEngineTransportService;
+
     @Value("${cluster.partition_id}")
     @Getter
     private long queuePartitionId;
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index 6a78f78..14ca586 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -39,7 +39,6 @@ import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.core.BasicActorSystemToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
 import org.thingsboard.server.dao.model.ModelConstants;
@@ -105,7 +104,7 @@ public class AppActor extends RuleChainManagerActor {
             case SERVICE_TO_RULE_ENGINE_MSG:
                 onServiceToRuleEngineMsg((ServiceToRuleEngineMsg) msg);
                 break;
-            case DEVICE_SESSION_TO_DEVICE_ACTOR_MSG:
+            case TRANSPORT_TO_DEVICE_ACTOR_MSG:
             case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
             case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
             case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
@@ -169,16 +168,6 @@ public class AppActor extends RuleChainManagerActor {
         getOrCreateTenantActor(msg.getTenantId()).tell(msg, ActorRef.noSender());
     }
 
-    private void processDeviceMsg(DeviceToDeviceActorMsg deviceToDeviceActorMsg) {
-        TenantId tenantId = deviceToDeviceActorMsg.getTenantId();
-        ActorRef tenantActor = getOrCreateTenantActor(tenantId);
-        if (deviceToDeviceActorMsg.getPayload().getMsgType().requiresRulesProcessing()) {
-//            tenantActor.tell(new RuleChainDeviceMsg(deviceToDeviceActorMsg, ruleManager.getRuleChain(this.context())), context().self());
-        } else {
-            tenantActor.tell(deviceToDeviceActorMsg, context().self());
-        }
-    }
-
     private ActorRef getOrCreateTenantActor(TenantId tenantId) {
         return tenantActors.computeIfAbsent(tenantId, k -> context().actorOf(Props.create(new TenantActor.ActorCreator(systemContext, tenantId))
                 .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), tenantId.toString()));
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
index 99d0045..7fabe49 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
@@ -26,12 +26,12 @@ import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
 import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
 import org.thingsboard.server.common.msg.timeout.DeviceActorQueueTimeoutMsg;
 import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
 import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
+import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
 
 public class DeviceActor extends ContextAwareActor {
 
@@ -50,8 +50,8 @@ public class DeviceActor extends ContextAwareActor {
             case CLUSTER_EVENT_MSG:
                 processor.processClusterEventMsg((ClusterEventMsg) msg);
                 break;
-            case DEVICE_SESSION_TO_DEVICE_ACTOR_MSG:
-                processor.process(context(), (DeviceToDeviceActorMsg) msg);
+            case TRANSPORT_TO_DEVICE_ACTOR_MSG:
+                processor.process(context(), (TransportToDeviceActorMsgWrapper) msg);
                 break;
             case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
                 processor.processAttributesUpdate(context(), (DeviceAttributesEventNotificationMsg) msg);
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index a2ea048..1313f60 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -61,7 +61,6 @@ import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
 import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
 import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg;
 import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
 import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 import org.thingsboard.server.common.msg.session.FromDeviceMsg;
@@ -71,9 +70,11 @@ import org.thingsboard.server.common.msg.session.ToDeviceMsg;
 import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
 import org.thingsboard.server.common.msg.timeout.DeviceActorQueueTimeoutMsg;
 import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
+import org.thingsboard.server.gen.transport.TransportProtos;
 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
 import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
+import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
@@ -92,6 +93,8 @@ import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import org.thingsboard.server.gen.transport.TransportProtos.*;
+
 /**
  * @author Andrew Shvayka
  */
@@ -99,12 +102,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 
     private final TenantId tenantId;
     private final DeviceId deviceId;
-    private final Map<SessionId, SessionInfo> sessions;
-    private final Map<SessionId, SessionInfo> attributeSubscriptions;
-    private final Map<SessionId, SessionInfo> rpcSubscriptions;
+    private final Map<UUID, SessionInfo> sessions;
+    private final Map<UUID, SessionInfo> attributeSubscriptions;
+    private final Map<UUID, SessionInfo> rpcSubscriptions;
     private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap;
     private final Map<Integer, ToServerRpcRequestMetadata> toServerRpcPendingMap;
-    private final Map<UUID, PendingSessionMsgData> pendingMsgs;
 
     private final Gson gson = new Gson();
     private final JsonParser jsonParser = new JsonParser();
@@ -123,7 +125,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         this.rpcSubscriptions = new HashMap<>();
         this.toDeviceRpcPendingMap = new HashMap<>();
         this.toServerRpcPendingMap = new HashMap<>();
-        this.pendingMsgs = new HashMap<>();
         initAttributes();
     }
 
@@ -154,11 +155,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         boolean sent = rpcSubscriptions.size() > 0;
         Set<SessionId> syncSessionSet = new HashSet<>();
         rpcSubscriptions.entrySet().forEach(sub -> {
-            ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sub.getKey());
-            sendMsgToSessionActor(response, sub.getValue().getServer());
-            if (SessionType.SYNC == sub.getValue().getType()) {
-                syncSessionSet.add(sub.getKey());
-            }
+//            ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sub.getKey());
+//            sendMsgToSessionActor(response, sub.getValue().getServer());
+//            if (SessionType.SYNC == sub.getValue().getType()) {
+//                syncSessionSet.add(sub.getKey());
+//            }
         });
         syncSessionSet.forEach(rpcSubscriptions::remove);
 
@@ -191,15 +192,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         }
     }
 
-    void processQueueTimeout(ActorContext context, DeviceActorQueueTimeoutMsg msg) {
-        PendingSessionMsgData data = pendingMsgs.remove(msg.getId());
-        if (data != null) {
-            logger.debug("[{}] Queue put [{}] timeout detected!", deviceId, msg.getId());
-            ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(data.getSessionMsgType(), RuleEngineError.QUEUE_PUT_TIMEOUT);
-            sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
-        }
-    }
-
     void processQueueAck(ActorContext context, RuleEngineQueuePutAckMsg msg) {
         PendingSessionMsgData data = pendingMsgs.remove(msg.getId());
         if (data != null && data.isReplyOnQueueAck()) {
@@ -252,31 +244,37 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         };
     }
 
-    void process(ActorContext context, DeviceToDeviceActorMsg msg) {
-        processSubscriptionCommands(context, msg);
-        processRpcResponses(context, msg);
-        processSessionStateMsgs(msg);
+    void process(ActorContext context, TransportToDeviceActorMsgWrapper wrapper) {
+        TransportToDeviceActorMsg msg = wrapper.getMsg();
+//        processSubscriptionCommands(context, msg);
+//        processRpcResponses(context, msg);
+        if (msg.hasSessionEvent()) {
+            processSessionStateMsgs(msg.getSessionInfo(), msg.getSessionEvent());
+        }
 
-        SessionMsgType sessionMsgType = msg.getPayload().getMsgType();
-        if (sessionMsgType.requiresRulesProcessing()) {
-            switch (sessionMsgType) {
-                case GET_ATTRIBUTES_REQUEST:
-                    handleGetAttributesRequest(msg);
-                    break;
-                case POST_ATTRIBUTES_REQUEST:
-                    handlePostAttributesRequest(context, msg);
-                    reportActivity();
-                    break;
-                case POST_TELEMETRY_REQUEST:
-                    handlePostTelemetryRequest(context, msg);
-                    reportActivity();
-                    break;
-                case TO_SERVER_RPC_REQUEST:
-                    handleClientSideRPCRequest(context, msg);
-                    reportActivity();
-                    break;
-            }
+        if (msg.hasPostAttributes()) {
+            handlePostAttributesRequest(context, msg.getSessionInfo(), msg.getPostAttributes());
+            reportActivity();
         }
+        if (msg.hasPostTelemetry()) {
+            handlePostTelemetryRequest(context, msg.getSessionInfo(), msg.getPostTelemetry());
+            reportActivity();
+        }
+        if (msg.hasGetAttributes()) {
+            handleGetAttributesRequest(context, msg.getSessionInfo(), msg.getGetAttributes());
+        }
+//        SessionMsgType sessionMsgType = msg.getPayload().getMsgType();
+//        if (sessionMsgType.requiresRulesProcessing()) {
+//            switch (sessionMsgType) {
+//                case GET_ATTRIBUTES_REQUEST:
+//                    handleGetAttributesRequest(msg);
+//                    break;
+//                case TO_SERVER_RPC_REQUEST:
+//                    handleClientSideRPCRequest(context, msg);
+//                    reportActivity();
+//                    break;
+//            }
+//        }
     }
 
     private void reportActivity() {
@@ -291,6 +289,39 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         systemContext.getDeviceStateService().onDeviceDisconnect(deviceId);
     }
 
+    private void handleGetAttributesRequest(ActorContext context, SessionInfoProto sessionInfo, GetAttributeRequestMsg request) {
+        ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, toOptionalSet(request.getClientAttributeNamesList()));
+        ListenableFuture<List<AttributeKvEntry>> sharedAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.SHARED_SCOPE, toOptionalSet(request.getSharedAttributeNamesList()));
+
+        Futures.addCallback(Futures.allAsList(Arrays.asList(clientAttributesFuture, sharedAttributesFuture)), new FutureCallback<List<List<AttributeKvEntry>>>() {
+            @Override
+            public void onSuccess(@Nullable List<List<AttributeKvEntry>> result) {
+                systemContext.getRuleEngineTransportService().process();
+                BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(),
+                        request.getRequestId(), BasicAttributeKVMsg.from(result.get(0), result.get(1)));
+                sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, src.getSessionId()), src.getServerAddress());
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                if (t instanceof Exception) {
+                    ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onError(SessionMsgType.GET_ATTRIBUTES_REQUEST, request.getRequestId(), (Exception) t);
+                    sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, src.getSessionId()), src.getServerAddress());
+                } else {
+                    logger.error("[{}] Failed to process attributes request", deviceId, t);
+                }
+            }
+        });
+    }
+
+    private Optional<Set<String>> toOptionalSet(List<String> strings) {
+        if (strings == null || strings.isEmpty()) {
+            return Optional.empty();
+        } else {
+            return Optional.of(new HashSet<>(strings));
+        }
+    }
+
     private void handleGetAttributesRequest(DeviceToDeviceActorMsg src) {
         GetAttributesRequest request = (GetAttributesRequest) src.getPayload();
         ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, request.getClientAttributeNames());
@@ -328,43 +359,20 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         }
     }
 
-    private void handlePostAttributesRequest(ActorContext context, DeviceToDeviceActorMsg src) {
-        AttributesUpdateRequest request = (AttributesUpdateRequest) src.getPayload();
-
-        JsonObject json = new JsonObject();
-        for (AttributeKvEntry kv : request.getAttributes()) {
-            kv.getBooleanValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
-            kv.getLongValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
-            kv.getDoubleValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
-            kv.getStrValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
-        }
-
-        TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, defaultMetaData.copy(), TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
-        PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(),
-                SessionMsgType.POST_ATTRIBUTES_REQUEST, request.getRequestId(), true, 1);
-        pushToRuleEngineWithTimeout(context, tbMsg, msgData);
+    private void handlePostAttributesRequest(ActorContext context, SessionInfoProto sessionInfo, PostAttributeMsg postAttributes) {
+        JsonObject json = getJsonObject(postAttributes.getKvList());
+        TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, defaultMetaData.copy(),
+                TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
+        pushToRuleEngine(context, tbMsg);
     }
 
-    private void handlePostTelemetryRequest(ActorContext context, DeviceToDeviceActorMsg src) {
-        TelemetryUploadRequest request = (TelemetryUploadRequest) src.getPayload();
-
-        Map<Long, List<KvEntry>> tsData = request.getData();
-
-        PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(),
-                SessionMsgType.POST_TELEMETRY_REQUEST, request.getRequestId(), true, tsData.size());
-
-        for (Map.Entry<Long, List<KvEntry>> entry : tsData.entrySet()) {
-            JsonObject json = new JsonObject();
-            for (KvEntry kv : entry.getValue()) {
-                kv.getBooleanValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
-                kv.getLongValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
-                kv.getDoubleValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
-                kv.getStrValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
-            }
+    private void handlePostTelemetryRequest(ActorContext context, SessionInfoProto sessionInfo, PostTelemetryMsg postTelemetry) {
+        for (TsKvListProto tsKv : postTelemetry.getTsKvListList()) {
+            JsonObject json = getJsonObject(tsKv.getKvList());
             TbMsgMetaData metaData = defaultMetaData.copy();
-            metaData.putValue("ts", entry.getKey() + "");
+            metaData.putValue("ts", tsKv.getTs() + "");
             TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, metaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
-            pushToRuleEngineWithTimeout(context, tbMsg, msgData);
+            pushToRuleEngine(context, tbMsg);
         }
     }
 
@@ -401,16 +409,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         }
     }
 
-    private void pushToRuleEngineWithTimeout(ActorContext context, TbMsg tbMsg, PendingSessionMsgData pendingMsgData) {
-        SessionMsgType sessionMsgType = pendingMsgData.getSessionMsgType();
-        int requestId = pendingMsgData.getRequestId();
-        if (systemContext.isQueuePersistenceEnabled()) {
-            pendingMsgs.put(tbMsg.getId(), pendingMsgData);
-            scheduleMsgWithDelay(context, new DeviceActorQueueTimeoutMsg(tbMsg.getId(), systemContext.getQueuePersistenceTimeout()), systemContext.getQueuePersistenceTimeout());
-        } else {
-            ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), pendingMsgData.getSessionId());
-            sendMsgToSessionActor(response, pendingMsgData.getServerAddress());
-        }
+    private void pushToRuleEngine(ActorContext context, TbMsg tbMsg) {
         context.parent().tell(new DeviceActorToRuleEngineMsg(context.self(), tbMsg), context.self());
     }
 
@@ -497,13 +496,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         }
     }
 
-    private void processSessionStateMsgs(DeviceToDeviceActorMsg msg) {
-        SessionId sessionId = msg.getSessionId();
-        FromDeviceMsg inMsg = msg.getPayload();
-        if (inMsg instanceof SessionOpenMsg) {
+    private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {
+        UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
+        if (msg.getEvent() == SessionEvent.OPEN) {
             logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
             if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) {
-                SessionId sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null);
+                UUID sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null);
                 if (sessionIdToRemove != null) {
                     closeSession(sessionIdToRemove, sessions.remove(sessionIdToRemove));
                 }
@@ -512,6 +510,10 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
             if (sessions.size() == 1) {
                 reportSessionOpen();
             }
+        }
+        FromDeviceMsg inMsg = msg.getPayload();
+        if (inMsg instanceof SessionOpenMsg) {
+            logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
         } else if (inMsg instanceof SessionCloseMsg) {
             logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
             sessions.remove(sessionId);
@@ -540,8 +542,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         rpcSubscriptions.clear();
     }
 
-    private void closeSession(SessionId sessionId, SessionInfo sessionInfo) {
-        sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(new SessionCloseNotification(), sessionId), sessionInfo.getServer());
+    private void closeSession(UUID sessionId, SessionInfo sessionInfo) {
+        DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
+                .setSessionIdMSB(sessionId.getMostSignificantBits())
+                .setSessionIdLSB(sessionId.getLeastSignificantBits())
+                .setSessionCloseNotification(SessionCloseNotificationProto.getDefaultInstance()).build();
+        systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg);
     }
 
     void processNameOrTypeUpdate(DeviceNameOrTypeUpdateMsg msg) {
@@ -552,4 +558,24 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         this.defaultMetaData.putValue("deviceType", deviceType);
     }
 
+    private JsonObject getJsonObject(List<KeyValueProto> tsKv) {
+        JsonObject json = new JsonObject();
+        for (KeyValueProto kv : tsKv) {
+            switch (kv.getType()) {
+                case BOOLEAN_V:
+                    json.addProperty(kv.getKey(), kv.getBoolV());
+                    break;
+                case LONG_V:
+                    json.addProperty(kv.getKey(), kv.getLongV());
+                    break;
+                case DOUBLE_V:
+                    json.addProperty(kv.getKey(), kv.getDoubleV());
+                    break;
+                case STRING_V:
+                    json.addProperty(kv.getKey(), kv.getStringV());
+                    break;
+            }
+        }
+        return json;
+    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java b/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java
index 23ad966..dfa07cf 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java
@@ -22,6 +22,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.session.SessionMsgType;
 
 import java.util.Optional;
+import java.util.UUID;
 
 /**
  * Created by ashvayka on 17.04.18.
@@ -30,7 +31,7 @@ import java.util.Optional;
 @AllArgsConstructor
 public final class PendingSessionMsgData {
 
-    private final SessionId sessionId;
+    private final UUID sessionId;
     private final Optional<ServerAddress> serverAddress;
     private final SessionMsgType sessionMsgType;
     private final int requestId;
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
index 04c457c..9faaade 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,10 +16,7 @@
 package org.thingsboard.server.actors.device;
 
 import lombok.Data;
-import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.session.SessionType;
-
-import java.util.Optional;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
 
 /**
  * @author Andrew Shvayka
@@ -27,5 +24,6 @@ import java.util.Optional;
 @Data
 public class SessionInfo {
     private final SessionType type;
-    private final Optional<ServerAddress> server;
+    private final String nodeId;
+
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
index 460b64c..347483a 100644
--- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
@@ -87,7 +87,7 @@ public class TenantActor extends RuleChainManagerActor {
             case DEVICE_ACTOR_TO_RULE_ENGINE_MSG:
                 onDeviceActorToRuleEngineMsg((DeviceActorToRuleEngineMsg) msg);
                 break;
-            case DEVICE_SESSION_TO_DEVICE_ACTOR_MSG:
+            case TRANSPORT_TO_DEVICE_ACTOR_MSG:
             case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
             case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
             case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java b/application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java
new file mode 100644
index 0000000..3ee1858
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java
@@ -0,0 +1,36 @@
+package org.thingsboard.server.service.transport.msg;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
+import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+/**
+ * Created by ashvayka on 09.10.18.
+ */
+@Data
+public class TransportToDeviceActorMsgWrapper implements TbActorMsg, DeviceAwareMsg, TenantAwareMsg, Serializable {
+
+    private final TenantId tenantId;
+    private final DeviceId deviceId;
+    private final TransportToDeviceActorMsg msg;
+
+    public TransportToDeviceActorMsgWrapper(TransportToDeviceActorMsg msg) {
+        this.msg = msg;
+        this.tenantId = new TenantId(new UUID(msg.getSessionInfo().getTenantIdMSB(), msg.getSessionInfo().getTenantIdLSB()));
+        this.deviceId = new DeviceId(new UUID(msg.getSessionInfo().getDeviceIdMSB(), msg.getSessionInfo().getDeviceIdLSB()));
+    }
+
+    @Override
+    public MsgType getMsgType() {
+        return MsgType.TRANSPORT_TO_DEVICE_ACTOR_MSG;
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
new file mode 100644
index 0000000..6b82792
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
@@ -0,0 +1,187 @@
+package org.thingsboard.server.service.transport;
+
+import akka.actor.ActorRef;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Service;
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.service.ActorService;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
+import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
+import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
+import org.thingsboard.server.kafka.TbKafkaSettings;
+import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
+import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
+import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
+import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
+import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+/**
+ * Created by ashvayka on 09.10.18.
+ */
+@Slf4j
+@Service
+@ConditionalOnProperty(prefix = "transport.remote", value = "enabled", havingValue = "true")
+public class RemoteRuleEngineTransportService implements RuleEngineTransportService {
+
+    private static final ObjectMapper mapper = new ObjectMapper();
+
+    @Value("${transport.remote.rule_engine.topic}")
+    private String ruleEngineTopic;
+    @Value("${transport.remote.notifications.topic}")
+    private String notificationsTopic;
+    @Value("${transport.remote.rule_engine.poll_interval}")
+    private int pollDuration;
+    @Value("${transport.remote.rule_engine.auto_commit_interval}")
+    private int autoCommitInterval;
+
+    @Autowired
+    private TbKafkaSettings kafkaSettings;
+
+    @Autowired
+    private DiscoveryService discoveryService;
+
+    @Autowired
+    private ActorSystemContext actorContext;
+
+    @Autowired
+    private ActorService actorService;
+
+    //TODO: completely replace this routing with the Kafka routing by partition ids.
+    @Autowired
+    private ClusterRoutingService routingService;
+    @Autowired
+    private ClusterRpcService rpcService;
+    @Autowired
+    private DataDecodingEncodingService encodingService;
+
+    private TBKafkaConsumerTemplate<ToRuleEngineMsg> ruleEngineConsumer;
+    private TBKafkaProducerTemplate<ToTransportMsg> notificationsProducer;
+
+    private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor();
+
+    private volatile boolean stopped = false;
+
+    @PostConstruct
+    public void init() {
+        TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<ToTransportMsg> notificationsProducerBuilder = TBKafkaProducerTemplate.builder();
+        notificationsProducerBuilder.settings(kafkaSettings);
+        notificationsProducerBuilder.defaultTopic(notificationsTopic);
+        notificationsProducerBuilder.encoder(new ToTransportMsgEncoder());
+
+        notificationsProducer = notificationsProducerBuilder.build();
+        notificationsProducer.init();
+
+        TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<ToRuleEngineMsg> ruleEngineConsumerBuilder = TBKafkaConsumerTemplate.builder();
+        ruleEngineConsumerBuilder.settings(kafkaSettings);
+        ruleEngineConsumerBuilder.topic(ruleEngineTopic);
+        ruleEngineConsumerBuilder.clientId(discoveryService.getNodeId());
+        ruleEngineConsumerBuilder.groupId("tb-node");
+        ruleEngineConsumerBuilder.autoCommit(true);
+        ruleEngineConsumerBuilder.autoCommitIntervalMs(autoCommitInterval);
+        ruleEngineConsumerBuilder.decoder(new ToRuleEngineMsgDecoder());
+
+        ruleEngineConsumer = ruleEngineConsumerBuilder.build();
+        ruleEngineConsumer.subscribe();
+
+        mainConsumerExecutor.execute(() -> {
+            while (!stopped) {
+                try {
+                    ConsumerRecords<String, byte[]> records = ruleEngineConsumer.poll(Duration.ofMillis(pollDuration));
+                    records.forEach(record -> {
+                        try {
+                            ToRuleEngineMsg toRuleEngineMsg = ruleEngineConsumer.decode(record);
+                            if (toRuleEngineMsg.hasToDeviceActorMsg()) {
+                                forwardToDeviceActor(toRuleEngineMsg.getToDeviceActorMsg());
+                            }
+                        } catch (Throwable e) {
+                            log.warn("Failed to process the notification.", e);
+                        }
+                    });
+                } catch (Exception e) {
+                    log.warn("Failed to obtain messages from queue.", e);
+                    try {
+                        Thread.sleep(pollDuration);
+                    } catch (InterruptedException e2) {
+                        log.trace("Failed to wait until the server has capacity to handle new requests", e2);
+                    }
+                }
+            }
+        });
+    }
+
+    @Override
+    public void process(String nodeId, DeviceActorToTransportMsg msg) {
+        process(nodeId, msg, null, null);
+    }
+
+    @Override
+    public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) {
+        notificationsProducer.send(notificationsTopic + "." + nodeId,
+                ToTransportMsg.newBuilder().setToDeviceSessionMsg(msg).build()
+                , new QueueCallbackAdaptor(onSuccess, onFailure));
+    }
+
+    private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg) {
+        TransportToDeviceActorMsgWrapper wrapper = new TransportToDeviceActorMsgWrapper(toDeviceActorMsg);
+        Optional<ServerAddress> address = routingService.resolveById(wrapper.getDeviceId());
+        if (address.isPresent()) {
+            rpcService.tell(encodingService.convertToProtoDataMessage(address.get(), wrapper));
+        } else {
+            actorContext.getAppActor().tell(wrapper, ActorRef.noSender());
+        }
+    }
+
+    @PreDestroy
+    public void destroy() {
+        stopped = true;
+        if (ruleEngineConsumer != null) {
+            ruleEngineConsumer.unsubscribe();
+        }
+        if (mainConsumerExecutor != null) {
+            mainConsumerExecutor.shutdownNow();
+        }
+    }
+
+    private static class QueueCallbackAdaptor implements Callback {
+        private final Runnable onSuccess;
+        private final Consumer<Throwable> onFailure;
+
+        QueueCallbackAdaptor(Runnable onSuccess, Consumer<Throwable> onFailure) {
+            this.onSuccess = onSuccess;
+            this.onFailure = onFailure;
+        }
+
+        @Override
+        public void onCompletion(RecordMetadata metadata, Exception exception) {
+            if (exception == null) {
+                if (onSuccess != null) {
+                    onSuccess.run();
+                }
+            } else {
+                if (onFailure != null) {
+                    onFailure.accept(exception);
+                }
+            }
+        }
+    }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RuleEngineTransportService.java
new file mode 100644
index 0000000..e6b3dd3
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RuleEngineTransportService.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.transport;
+
+import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.;
+
+import java.util.function.Consumer;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+public interface RuleEngineTransportService {
+
+    void process(String nodeId, DeviceActorToTransportMsg msg);
+
+    void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure);
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/ToTransportMsgEncoder.java b/application/src/main/java/org/thingsboard/server/service/transport/ToTransportMsgEncoder.java
new file mode 100644
index 0000000..5f3c026
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/transport/ToTransportMsgEncoder.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.transport;
+
+import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
+import org.thingsboard.server.kafka.TbKafkaEncoder;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+public class ToTransportMsgEncoder implements TbKafkaEncoder<ToTransportMsg> {
+    @Override
+    public byte[] encode(ToTransportMsg value) {
+        return value.toByteArray();
+    }
+}
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index c58f3a1..923976c 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -462,4 +462,8 @@ transport:
       request_poll_interval: "${TB_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}"
       request_auto_commit_interval: "${TB_TRANSPORT_RESPONSE_AUTO_COMMIT_INTERVAL_MS:1000}"
     rule_engine:
-      topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}"
\ No newline at end of file
+      topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}"
+      poll_interval: "${TB_RULE_ENGINE_POLL_INTERVAL_MS:25}"
+      auto_commit_interval: "${TB_RULE_ENGINE_AUTO_COMMIT_INTERVAL_MS:100}"
+    notifications:
+      topic: "${TB_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}"
\ No newline at end of file
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index 60e5469..dfd8f98 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -77,11 +77,6 @@ public enum MsgType {
      */
     RULE_TO_SELF_MSG,
 
-    /**
-     * Message that is sent by Session Actor to Device Actor. Represents messages from the device itself.
-     */
-    DEVICE_SESSION_TO_DEVICE_ACTOR_MSG,
-
     DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG,
 
     DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG,
@@ -111,6 +106,12 @@ public enum MsgType {
     TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG,
     SESSION_TIMEOUT_MSG,
     SESSION_CTRL_MSG,
-    STATS_PERSIST_TICK_MSG;
+    STATS_PERSIST_TICK_MSG,
+
+
+    /**
+     * Message that is sent by TransportRuleEngineService to Device Actor. Represents messages from the device itself.
+     */
+    TRANSPORT_TO_DEVICE_ACTOR_MSG;
 
 }
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
index 3adb1c3..90f15a4 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -49,7 +49,9 @@ public class TBKafkaConsumerTemplate<T> {
                                     boolean autoCommit, int autoCommitIntervalMs) {
         Properties props = settings.toProps();
         props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        if (groupId != null) {
+            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        }
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
index 1e109d2..8f4c095 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,6 +20,7 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.admin.CreateTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -89,28 +90,28 @@ public class TBKafkaProducerTemplate<T> {
         }
     }
 
-    public Future<RecordMetadata> send(String key, T value) {
-        return send(key, value, null, null);
+    public Future<RecordMetadata> send(String key, T value, Callback callback) {
+        return send(key, value, null, callback);
     }
 
-    public Future<RecordMetadata> send(String key, T value, Iterable<Header> headers) {
-        return send(key, value, null, headers);
+    public Future<RecordMetadata> send(String key, T value, Iterable<Header> headers, Callback callback) {
+        return send(key, value, null, headers, callback);
     }
 
-    public Future<RecordMetadata> send(String key, T value, Long timestamp, Iterable<Header> headers) {
-        return send(this.defaultTopic, key, value, timestamp, headers);
+    public Future<RecordMetadata> send(String key, T value, Long timestamp, Iterable<Header> headers, Callback callback) {
+        return send(this.defaultTopic, key, value, timestamp, headers, callback);
     }
 
-    public Future<RecordMetadata> send(String topic, String key, T value, Iterable<Header> headers) {
-        return send(topic, key, value, null, headers);
+    public Future<RecordMetadata> send(String topic, String key, T value, Iterable<Header> headers, Callback callback) {
+        return send(topic, key, value, null, headers, callback);
     }
 
-    public Future<RecordMetadata> send(String topic, String key, T value, Long timestamp, Iterable<Header> headers) {
+    public Future<RecordMetadata> send(String topic, String key, T value, Long timestamp, Iterable<Header> headers, Callback callback) {
         byte[] data = encoder.encode(value);
         ProducerRecord<String, byte[]> record;
         Integer partition = getPartition(topic, key, value, data);
         record = new ProducerRecord<>(topic, partition, timestamp, key, data, headers);
-        return producer.send(record);
+        return producer.send(record, callback);
     }
 
     private Integer getPartition(String topic, String key, T value, byte[] data) {
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
index 0dbf45d..6f9e8a7 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -77,55 +77,64 @@ public class TbKafkaResponseTemplate<Request, Response> extends AbstractTbKafkaT
         requestTemplate.subscribe();
         loopExecutor.submit(() -> {
             while (!stopped) {
-                while (pendingRequestCount.get() >= maxPendingRequests) {
+                try {
+                    while (pendingRequestCount.get() >= maxPendingRequests) {
+                        try {
+                            Thread.sleep(pollInterval);
+                        } catch (InterruptedException e) {
+                            log.trace("Failed to wait until the server has capacity to handle new requests", e);
+                        }
+                    }
+                    ConsumerRecords<String, byte[]> requests = requestTemplate.poll(Duration.ofMillis(pollInterval));
+                    requests.forEach(request -> {
+                        Header requestIdHeader = request.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
+                        if (requestIdHeader == null) {
+                            log.error("[{}] Missing requestId in header", request);
+                            return;
+                        }
+                        UUID requestId = bytesToUuid(requestIdHeader.value());
+                        if (requestId == null) {
+                            log.error("[{}] Missing requestId in header and body", request);
+                            return;
+                        }
+                        Header responseTopicHeader = request.headers().lastHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER);
+                        if (responseTopicHeader == null) {
+                            log.error("[{}] Missing response topic in header", request);
+                            return;
+                        }
+                        String responseTopic = bytesToString(responseTopicHeader.value());
+                        try {
+                            pendingRequestCount.getAndIncrement();
+                            Request decodedRequest = requestTemplate.decode(request);
+                            AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(decodedRequest),
+                                    response -> {
+                                        pendingRequestCount.decrementAndGet();
+                                        reply(requestId, responseTopic, response);
+                                    },
+                                    e -> {
+                                        pendingRequestCount.decrementAndGet();
+                                        if (e.getCause() != null && e.getCause() instanceof TimeoutException) {
+                                            log.warn("[{}] Timedout to process the request: {}", requestId, request, e);
+                                        } else {
+                                            log.trace("[{}] Failed to process the request: {}", requestId, request, e);
+                                        }
+                                    },
+                                    requestTimeout,
+                                    timeoutExecutor,
+                                    callbackExecutor);
+                        } catch (Throwable e) {
+                            pendingRequestCount.decrementAndGet();
+                            log.warn("[{}] Failed to process the request: {}", requestId, request, e);
+                        }
+                    });
+                } catch (Throwable e) {
+                    log.warn("Failed to obtain messages from queue.", e);
                     try {
                         Thread.sleep(pollInterval);
-                    } catch (InterruptedException e) {
-                        log.trace("Failed to wait until the server has capacity to handle new requests", e);
+                    } catch (InterruptedException e2) {
+                        log.trace("Failed to wait until the server has capacity to handle new requests", e2);
                     }
                 }
-                ConsumerRecords<String, byte[]> requests = requestTemplate.poll(Duration.ofMillis(pollInterval));
-                requests.forEach(request -> {
-                    Header requestIdHeader = request.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
-                    if (requestIdHeader == null) {
-                        log.error("[{}] Missing requestId in header", request);
-                        return;
-                    }
-                    UUID requestId = bytesToUuid(requestIdHeader.value());
-                    if (requestId == null) {
-                        log.error("[{}] Missing requestId in header and body", request);
-                        return;
-                    }
-                    Header responseTopicHeader = request.headers().lastHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER);
-                    if (responseTopicHeader == null) {
-                        log.error("[{}] Missing response topic in header", request);
-                        return;
-                    }
-                    String responseTopic = bytesToString(responseTopicHeader.value());
-                    try {
-                        pendingRequestCount.getAndIncrement();
-                        Request decodedRequest = requestTemplate.decode(request);
-                        AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(decodedRequest),
-                                response -> {
-                                    pendingRequestCount.decrementAndGet();
-                                    reply(requestId, responseTopic, response);
-                                },
-                                e -> {
-                                    pendingRequestCount.decrementAndGet();
-                                    if (e.getCause() != null && e.getCause() instanceof TimeoutException) {
-                                        log.warn("[{}] Timedout to process the request: {}", requestId, request, e);
-                                    } else {
-                                        log.trace("[{}] Failed to process the request: {}", requestId, request, e);
-                                    }
-                                },
-                                requestTimeout,
-                                timeoutExecutor,
-                                callbackExecutor);
-                    } catch (Throwable e) {
-                        pendingRequestCount.decrementAndGet();
-                        log.warn("[{}] Failed to process the request: {}", requestId, request, e);
-                    }
-                });
             }
         });
     }
@@ -141,7 +150,7 @@ public class TbKafkaResponseTemplate<Request, Response> extends AbstractTbKafkaT
     }
 
     private void reply(UUID requestId, String topic, Response response) {
-        responseTemplate.send(topic, requestId.toString(), response, Collections.singletonList(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId))));
+        responseTemplate.send(topic, requestId.toString(), response, Collections.singletonList(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId))), null);
     }
 
 }
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
index 7b2c05e..3068974 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,17 +16,12 @@
 package org.thingsboard.server.common.transport.session;
 
 import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-import org.thingsboard.server.common.data.Device;
-import org.thingsboard.server.common.data.security.DeviceCredentialsFilter;
-import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
+import lombok.Getter;
+import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.msg.session.SessionContext;
-import org.thingsboard.server.common.transport.SessionMsgProcessor;
-import org.thingsboard.server.common.transport.auth.DeviceAuthResult;
-import org.thingsboard.server.common.transport.auth.DeviceAuthService;
-import org.thingsboard.server.gen.transport.TransportProtos;
+import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
 
-import java.util.Optional;
+import java.util.UUID;
 
 /**
  * @author Andrew Shvayka
@@ -34,7 +29,9 @@ import java.util.Optional;
 @Data
 public abstract class DeviceAwareSessionContext implements SessionContext {
 
-    private volatile TransportProtos.DeviceInfoProto deviceInfo;
+    @Getter
+    private volatile DeviceId deviceId;
+    private volatile DeviceInfoProto deviceInfo;
 
     public long getDeviceIdMSB() {
         return deviceInfo.getDeviceIdMSB();
@@ -44,6 +41,16 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
         return deviceInfo.getDeviceIdLSB();
     }
 
+    public DeviceId getDeviceId() {
+        return deviceId;
+    }
+
+    public void setDeviceInfo(DeviceInfoProto deviceInfo) {
+        this.deviceInfo = deviceInfo;
+        this.deviceId = new DeviceId(new UUID(deviceInfo.getDeviceIdMSB(), deviceInfo.getDeviceIdLSB()));
+    }
+
+
     public boolean isConnected() {
         return deviceInfo != null;
     }
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
new file mode 100644
index 0000000..d0e6d18
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
@@ -0,0 +1,26 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport;
+
+import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
+
+/**
+ * Created by ashvayka on 04.10.18.
+ */
+public interface SessionMsgListener {
+
+    void onGetAttributesResponse(GetAttributeResponseMsg getAttributesResponse);
+}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index 84d34e1..dc2e306 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,23 +15,33 @@
  */
 package org.thingsboard.server.common.transport;
 
-import org.thingsboard.server.gen.transport.TransportProtos;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
+import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
 
 /**
  * Created by ashvayka on 04.10.18.
  */
 public interface TransportService {
 
-    void process(TransportProtos.ValidateDeviceTokenRequestMsg msg,
-                 TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback);
+    void process(ValidateDeviceTokenRequestMsg msg,
+                 TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback);
 
-    void process(TransportProtos.ValidateDeviceX509CertRequestMsg msg,
-                 TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback);
+    void process(ValidateDeviceX509CertRequestMsg msg,
+                 TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback);
 
-    void process(TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback);
+    void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback);
 
-    void process(TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback);
+    void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback);
 
-    void process(TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback);
+    void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback);
+
+    void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
+
+    void deregisterSession(SessionInfoProto sessionInfo);
 
 }
diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto
index e78f873..f3b3ae8 100644
--- a/common/transport/src/main/proto/transport.proto
+++ b/common/transport/src/main/proto/transport.proto
@@ -23,9 +23,12 @@ option java_outer_classname = "TransportProtos";
  * Data Structures;
  */
 message SessionInfoProto {
-  string nodeId = 1;
-  int64 sessionIdMSB = 2;
-  int64 sessionIdLSB = 3;
+  int64 sessionIdMSB = 1;
+  int64 sessionIdLSB = 2;
+  int64 tenantIdMSB = 3;
+  int64 tenantIdLSB = 4;
+  int64 deviceIdMSB = 5;
+  int64 deviceIdLSB = 6;
 }
 
 enum SessionEvent {
@@ -33,12 +36,25 @@ enum SessionEvent {
   CLOSED = 1;
 }
 
+enum SessionType {
+  SYNC = 0;
+  ASYNC = 1;
+}
+
+enum KeyValueType {
+  BOOLEAN_V = 0;
+  LONG_V = 1;
+  DOUBLE_V = 2;
+  STRING_V = 3;
+}
+
 message KeyValueProto {
   string key = 1;
-  bool bool_v = 2;
-  int64 long_v = 3;
-  double double_v = 4;
-  string string_v = 5;
+  KeyValueType type = 2;
+  bool bool_v = 3;
+  int64 long_v = 4;
+  double double_v = 5;
+  string string_v = 6;
 }
 
 message TsKvListProto {
@@ -60,33 +76,28 @@ message DeviceInfoProto {
  * Messages that use Data Structures;
  */
 message SessionEventMsg {
-  SessionInfoProto sessionInfo = 1;
-  int64 deviceIdMSB = 2;
-  int64 deviceIdLSB = 3;
-  SessionEvent event = 4;
+  string nodeId = 1;
+  SessionType sessionType = 2;
+  SessionEvent event = 3;
 }
 
 message PostTelemetryMsg {
-  SessionInfoProto sessionInfo = 1;
-  repeated TsKvListProto tsKvList = 2;
+  repeated TsKvListProto tsKvList = 1;
 }
 
 message PostAttributeMsg {
-  SessionInfoProto sessionInfo = 1;
-  repeated TsKvListProto tsKvList = 2;
+  repeated KeyValueProto kv = 1;
 }
 
 message GetAttributeRequestMsg {
-  SessionInfoProto sessionInfo = 1;
-  repeated string clientAttributeNames = 2;
-  repeated string sharedAttributeNames = 3;
+  repeated string clientAttributeNames = 1;
+  repeated string sharedAttributeNames = 2;
 }
 
 message GetAttributeResponseMsg {
-  SessionInfoProto sessionInfo = 1;
-  repeated TsKvListProto clientAttributeList = 2;
-  repeated TsKvListProto sharedAttributeList = 3;
-  repeated string deletedAttributeKeys = 4;
+  repeated TsKvListProto clientAttributeList = 1;
+  repeated TsKvListProto sharedAttributeList = 2;
+  repeated string deletedAttributeKeys = 3;
 }
 
 message ValidateDeviceTokenRequestMsg {
@@ -101,11 +112,34 @@ message ValidateDeviceCredentialsResponseMsg {
   DeviceInfoProto deviceInfo = 1;
 }
 
+message SessionCloseNotificationProto {
+  string message = 1;
+}
+
+message TransportToDeviceActorMsg {
+  SessionInfoProto sessionInfo = 1;
+  SessionEventMsg sessionEvent = 2;
+  PostTelemetryMsg postTelemetry = 3;
+  PostAttributeMsg postAttributes = 4;
+  GetAttributeRequestMsg getAttributes = 5;
+}
+
+message DeviceActorToTransportMsg {
+   int64 sessionIdMSB = 1;
+   int64 sessionIdLSB = 2;
+   SessionCloseNotificationProto sessionCloseNotification = 3;
+   GetAttributeResponseMsg getAttributesResponse = 4;
+}
+
 /**
  * Main messages;
  */
-message TransportToRuleEngineMsg {
+message ToRuleEngineMsg {
+  TransportToDeviceActorMsg toDeviceActorMsg = 1;
+}
 
+message ToTransportMsg {
+  DeviceActorToTransportMsg toDeviceSessionMsg = 1;
 }
 
 message TransportApiRequestMsg {
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 23bc4cc..d8f1d10 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,32 +26,25 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader;
 import io.netty.handler.codec.mqtt.MqttMessage;
 import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
 import io.netty.handler.codec.mqtt.MqttPubAckMessage;
-import io.netty.handler.codec.mqtt.MqttPublishMessage;
 import io.netty.handler.codec.mqtt.MqttQoS;
 import io.netty.handler.codec.mqtt.MqttSubAckMessage;
 import io.netty.handler.codec.mqtt.MqttSubAckPayload;
-import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
-import io.netty.handler.codec.mqtt.MqttTopicSubscription;
-import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.util.StringUtils;
-import org.thingsboard.server.common.data.Device;
-import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
-import org.thingsboard.server.common.data.security.DeviceX509Credentials;
-import org.thingsboard.server.common.msg.core.SessionOpenMsg;
-import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
-import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
-import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
 import org.thingsboard.server.common.transport.TransportService;
 import org.thingsboard.server.common.transport.TransportServiceCallback;
-import org.thingsboard.server.common.transport.adaptor.AdaptorException;
 import org.thingsboard.server.common.transport.quota.QuotaService;
 import org.thingsboard.server.dao.EncryptionUtil;
-import org.thingsboard.server.gen.transport.TransportProtos;
+import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
 import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
 import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;
@@ -61,49 +54,19 @@ import javax.net.ssl.SSLPeerUnverifiedException;
 import javax.security.cert.X509Certificate;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import org.thingsboard.server.gen.transport.TransportProtos.*;
-
 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
 import static io.netty.handler.codec.mqtt.MqttMessageType.CONNACK;
-import static io.netty.handler.codec.mqtt.MqttMessageType.PINGRESP;
 import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK;
 import static io.netty.handler.codec.mqtt.MqttMessageType.SUBACK;
-import static io.netty.handler.codec.mqtt.MqttMessageType.UNSUBACK;
 import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
 import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
-import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE;
-import static org.thingsboard.server.common.msg.session.SessionMsgType.GET_ATTRIBUTES_REQUEST;
-import static org.thingsboard.server.common.msg.session.SessionMsgType.POST_ATTRIBUTES_REQUEST;
-import static org.thingsboard.server.common.msg.session.SessionMsgType.POST_TELEMETRY_REQUEST;
-import static org.thingsboard.server.common.msg.session.SessionMsgType.SUBSCRIBE_ATTRIBUTES_REQUEST;
-import static org.thingsboard.server.common.msg.session.SessionMsgType.SUBSCRIBE_RPC_COMMANDS_REQUEST;
-import static org.thingsboard.server.common.msg.session.SessionMsgType.TO_DEVICE_RPC_RESPONSE;
-import static org.thingsboard.server.common.msg.session.SessionMsgType.TO_SERVER_RPC_REQUEST;
-import static org.thingsboard.server.common.msg.session.SessionMsgType.UNSUBSCRIBE_ATTRIBUTES_REQUEST;
-import static org.thingsboard.server.common.msg.session.SessionMsgType.UNSUBSCRIBE_RPC_COMMANDS_REQUEST;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.BASE_GATEWAY_API_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_REQUESTS_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_RESPONSE_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_TELEMETRY_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_ATTRIBUTES_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_CONNECT_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_DISCONNECT_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_RPC_TOPIC;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_TELEMETRY_TOPIC;
 
 /**
  * @author Andrew Shvayka
@@ -389,7 +352,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
                             } else {
                                 ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
                                 deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
-                                transportService.process(getSessionEventMsg(SessionEvent.OPEN), null);
+                                transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.OPEN), null);
                                 checkGatewaySession();
                             }
                         }
@@ -418,7 +381,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
                             } else {
                                 ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
                                 deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
-                                transportService.process(getSessionEventMsg(SessionEvent.OPEN), null);
+                                transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.OPEN), null);
                                 checkGatewaySession();
                             }
                         }
@@ -452,7 +415,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
     private void processDisconnect(ChannelHandlerContext ctx) {
         ctx.close();
         if (deviceSessionCtx.isConnected()) {
-            transportService.process(getSessionEventMsg(SessionEvent.CLOSED), null);
+            transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.CLOSED), null);
             if (gatewaySessionCtx != null) {
                 gatewaySessionCtx.onGatewayDisconnect();
             }
@@ -534,7 +497,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
     @Override
     public void operationComplete(Future<? super Void> future) throws Exception {
         if (deviceSessionCtx.isConnected()) {
-            transportService.process(getSessionEventMsg(SessionEvent.CLOSED), null);
+            transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.CLOSED), null);
         }
     }
 }
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
index 4d945de..76c1c1b 100644
--- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,21 +15,41 @@
  */
 package org.thingsboard.server.mqtt.service;
 
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
+import org.thingsboard.server.common.transport.SessionMsgListener;
 import org.thingsboard.server.common.transport.TransportService;
 import org.thingsboard.server.common.transport.TransportServiceCallback;
+import org.thingsboard.server.gen.transport.TransportProtos;
+import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
+import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
 import org.thingsboard.server.kafka.AsyncCallbackTemplate;
 import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
 import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
 import org.thingsboard.server.kafka.TbKafkaRequestTemplate;
-import org.thingsboard.server.gen.transport.TransportProtos.*;
 import org.thingsboard.server.kafka.TbKafkaSettings;
 import org.thingsboard.server.transport.mqtt.MqttTransportContext;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -37,10 +57,17 @@ import java.util.concurrent.Executors;
  * Created by ashvayka on 05.10.18.
  */
 @Service
+@Slf4j
 public class MqttTransportService implements TransportService {
 
     @Value("${kafka.rule_engine.topic}")
     private String ruleEngineTopic;
+    @Value("${kafka.notifications.topic}")
+    private String notificationsTopic;
+    @Value("${kafka.notifications.poll_interval}")
+    private int notificationsPollDuration;
+    @Value("${kafka.notifications.auto_commit_interval}")
+    private int notificationsAutoCommitInterval;
     @Value("${kafka.transport_api.requests_topic}")
     private String transportApiRequestsTopic;
     @Value("${kafka.transport_api.responses_topic}")
@@ -54,6 +81,8 @@ public class MqttTransportService implements TransportService {
     @Value("${kafka.transport_api.response_auto_commit_interval}")
     private int autoCommitInterval;
 
+    private ConcurrentMap<UUID, SessionMsgListener> sessions = new ConcurrentHashMap<>();
+
     @Autowired
     private TbKafkaSettings kafkaSettings;
     //We use this to get the node id. We should replace this with a component that provides the node id.
@@ -63,6 +92,12 @@ public class MqttTransportService implements TransportService {
     private ExecutorService transportCallbackExecutor;
 
     private TbKafkaRequestTemplate<TransportApiRequestMsg, TransportApiResponseMsg> transportApiTemplate;
+    private TBKafkaProducerTemplate<ToRuleEngineMsg> ruleEngineProducer;
+    private TBKafkaConsumerTemplate<ToTransportMsg> mainConsumer;
+
+    private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor();
+
+    private volatile boolean stopped = false;
 
     @PostConstruct
     public void init() {
@@ -77,7 +112,7 @@ public class MqttTransportService implements TransportService {
         responseBuilder.settings(kafkaSettings);
         responseBuilder.topic(transportApiResponsesTopic + "." + transportContext.getNodeId());
         responseBuilder.clientId(transportContext.getNodeId());
-        responseBuilder.groupId("transport-node");
+        responseBuilder.groupId(null);
         responseBuilder.autoCommit(true);
         responseBuilder.autoCommitIntervalMs(autoCommitInterval);
         responseBuilder.decoder(new TransportApiResponseDecoder());
@@ -91,16 +126,79 @@ public class MqttTransportService implements TransportService {
         builder.pollInterval(responsePollDuration);
         transportApiTemplate = builder.build();
         transportApiTemplate.init();
+
+        TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<ToRuleEngineMsg> ruleEngineProducerBuilder = TBKafkaProducerTemplate.builder();
+        ruleEngineProducerBuilder.settings(kafkaSettings);
+        ruleEngineProducerBuilder.defaultTopic(ruleEngineTopic);
+        ruleEngineProducerBuilder.encoder(new ToRuleEngineMsgEncoder());
+        ruleEngineProducer = ruleEngineProducerBuilder.build();
+        ruleEngineProducer.init();
+
+        TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<ToTransportMsg> mainConsumerBuilder = TBKafkaConsumerTemplate.builder();
+        mainConsumerBuilder.settings(kafkaSettings);
+        mainConsumerBuilder.topic(notificationsTopic + "." + transportContext.getNodeId());
+        mainConsumerBuilder.clientId(transportContext.getNodeId());
+        mainConsumerBuilder.groupId(null);
+        mainConsumerBuilder.autoCommit(true);
+        mainConsumerBuilder.autoCommitIntervalMs(notificationsAutoCommitInterval);
+        mainConsumerBuilder.decoder(new ToTransportMsgResponseDecoder());
+        mainConsumer = mainConsumerBuilder.build();
+        mainConsumer.subscribe();
+
+        mainConsumerExecutor.execute(() -> {
+            while (!stopped) {
+                try {
+                    ConsumerRecords<String, byte[]> records = mainConsumer.poll(Duration.ofMillis(notificationsPollDuration));
+                    records.forEach(record -> {
+                        try {
+                            ToTransportMsg toTransportMsg = mainConsumer.decode(record);
+                            if (toTransportMsg.hasToDeviceSessionMsg()) {
+                                TransportProtos.DeviceActorToTransportMsg toSessionMsg = toTransportMsg.getToDeviceSessionMsg();
+                                UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB());
+                                SessionMsgListener listener = sessions.get(sessionId);
+                                if (listener != null) {
+                                    transportCallbackExecutor.submit(() -> {
+                                        if (toSessionMsg.hasGetAttributesResponse()) {
+                                            listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse());
+                                        }
+                                    });
+                                } else {
+                                    //TODO: should we notify the device actor about missed session?
+                                    log.debug("[{}] Missing session.", sessionId);
+                                }
+
+                            }
+                        } catch (Throwable e) {
+                            log.warn("Failed to process the notification.", e);
+                        }
+                    });
+                } catch (Exception e) {
+                    log.warn("Failed to obtain messages from queue.", e);
+                    try {
+                        Thread.sleep(notificationsPollDuration);
+                    } catch (InterruptedException e2) {
+                        log.trace("Failed to wait until the server has capacity to handle new requests", e2);
+                    }
+                }
+            }
+        });
     }
 
     @PreDestroy
     public void destroy() {
+        stopped = true;
         if (transportApiTemplate != null) {
             transportApiTemplate.stop();
         }
         if (transportCallbackExecutor != null) {
             transportCallbackExecutor.shutdownNow();
         }
+        if (mainConsumer != null) {
+            mainConsumer.unsubscribe();
+        }
+        if (mainConsumerExecutor != null) {
+            mainConsumerExecutor.shutdownNow();
+        }
     }
 
     @Override
@@ -118,17 +216,69 @@ public class MqttTransportService implements TransportService {
     }
 
     @Override
-    public void process(SessionEventMsg msg, TransportServiceCallback<Void> callback) {
+    public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
+        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                        .setSessionEvent(msg).build()
+        ).build();
+        send(sessionInfo, toRuleEngineMsg, callback);
+    }
 
+    @Override
+    public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
+        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                        .setPostTelemetry(msg).build()
+        ).build();
+        send(sessionInfo, toRuleEngineMsg, callback);
     }
 
     @Override
-    public void process(PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
+    public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
+        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                        .setPostAttributes(msg).build()
+        ).build();
+        send(sessionInfo, toRuleEngineMsg, callback);
+    }
 
+    @Override
+    public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {
+        sessions.putIfAbsent(toId(sessionInfo), listener);
+        //TODO: monitor sessions periodically: PING REQ/RESP, etc.
     }
 
     @Override
-    public void process(PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
+    public void deregisterSession(SessionInfoProto sessionInfo) {
+        sessions.remove(toId(sessionInfo));
+    }
+
+    private UUID toId(SessionInfoProto sessionInfo) {
+        return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
+    }
+
+    private String getRoutingKey(SessionInfoProto sessionInfo) {
+        return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()).toString();
+    }
+
+    private static class TransportCallbackAdaptor implements Callback {
+        private final TransportServiceCallback<Void> callback;
+
+        TransportCallbackAdaptor(TransportServiceCallback<Void> callback) {
+            this.callback = callback;
+        }
+
+        @Override
+        public void onCompletion(RecordMetadata metadata, Exception exception) {
+            if (exception == null) {
+                callback.onSuccess(null);
+            } else {
+                callback.onError(exception);
+            }
+        }
+    }
 
+    private void send(SessionInfoProto sessionInfo, ToRuleEngineMsg toRuleEngineMsg, TransportServiceCallback<Void> callback) {
+        ruleEngineProducer.send(getRoutingKey(sessionInfo), toRuleEngineMsg, new TransportCallbackAdaptor(callback));
     }
 }
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToRuleEngineMsgEncoder.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToRuleEngineMsgEncoder.java
new file mode 100644
index 0000000..ee929ce
--- /dev/null
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToRuleEngineMsgEncoder.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.mqtt.service;
+
+import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
+import org.thingsboard.server.kafka.TbKafkaEncoder;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+public class ToRuleEngineMsgEncoder implements TbKafkaEncoder<ToRuleEngineMsg> {
+    @Override
+    public byte[] encode(ToRuleEngineMsg value) {
+        return value.toByteArray();
+    }
+}
diff --git a/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml
index 707fb4b..735eee0 100644
--- a/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml
+++ b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml
@@ -82,3 +82,7 @@ kafka:
     response_auto_commit_interval: "${TB_TRANSPORT_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}"
   rule_engine:
     topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}"
+  notifications:
+    topic: "${TB_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}"
+    poll_interval: "${TB_TRANSPORT_NOTIFICATIONS_POLL_INTERVAL_MS:25}"
+    auto_commit_interval: "${TB_TRANSPORT_NOTIFICATIONS_AUTO_COMMIT_INTERVAL_MS:100}"