thingsboard-aplcache

Fixed initialization order

11/6/2018 11:42:01 AM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index 714ab57..9f27641 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -39,6 +39,7 @@ import org.thingsboard.server.common.data.Tenant;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.page.PageDataIterable;
 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
+import org.thingsboard.server.common.msg.MsgType;
 import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
@@ -58,6 +59,7 @@ public class AppActor extends RuleChainManagerActor {
     private static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID);
     private final TenantService tenantService;
     private final BiMap<TenantId, ActorRef> tenantActors;
+    private boolean ruleChainsInitialized;
 
     private AppActor(ActorSystemContext systemContext) {
         super(systemContext, new SystemRuleChainManager(systemContext));
@@ -72,26 +74,20 @@ public class AppActor extends RuleChainManagerActor {
 
     @Override
     public void preStart() {
-        log.info("Starting main system actor.");
-        try {
-            initRuleChains();
-            if (systemContext.isTenantComponentsInitEnabled()) {
-                PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
-                for (Tenant tenant : tenantIterator) {
-                    log.debug("[{}] Creating tenant actor", tenant.getId());
-                    getOrCreateTenantActor(tenant.getId());
-                    log.debug("Tenant actor created.");
-                }
-            }
-            log.info("Main system actor started.");
-        } catch (Exception e) {
-            log.warn("Unknown failure", e);
-        }
     }
 
     @Override
     protected boolean process(TbActorMsg msg) {
+        if (!ruleChainsInitialized) {
+            initRuleChainsAndTenantActors();
+            ruleChainsInitialized = true;
+            if (msg.getMsgType() != MsgType.APP_INIT_MSG) {
+                log.warn("Rule Chains initialized by unexpected message: {}", msg);
+            }
+        }
         switch (msg.getMsgType()) {
+            case APP_INIT_MSG:
+                break;
             case SEND_TO_CLUSTER_MSG:
                 onPossibleClusterMsg((SendToClusterMsg) msg);
                 break;
@@ -119,6 +115,24 @@ public class AppActor extends RuleChainManagerActor {
         return true;
     }
 
+    private void initRuleChainsAndTenantActors() {
+        log.info("Starting main system actor.");
+        try {
+            initRuleChains();
+            if (systemContext.isTenantComponentsInitEnabled()) {
+                PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
+                for (Tenant tenant : tenantIterator) {
+                    log.debug("[{}] Creating tenant actor", tenant.getId());
+                    getOrCreateTenantActor(tenant.getId());
+                    log.debug("Tenant actor created.");
+                }
+            }
+            log.info("Main system actor started.");
+        } catch (Exception e) {
+            log.warn("Unknown failure", e);
+        }
+    }
+
     private void onPossibleClusterMsg(SendToClusterMsg msg) {
         Optional<ServerAddress> address = systemContext.getRoutingService().resolveById(msg.getEntityId());
         if (address.isPresent()) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppInitMsg.java b/application/src/main/java/org/thingsboard/server/actors/app/AppInitMsg.java
new file mode 100644
index 0000000..fd8c178
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppInitMsg.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.app;
+
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
+
+public class AppInitMsg implements TbActorMsg {
+
+    @Override
+    public MsgType getMsgType() {
+        return MsgType.APP_INIT_MSG;
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 41d1ea0..833e5ea 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
+import com.google.protobuf.InvalidProtocolBufferException;
 import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.rule.engine.api.RpcError;
 import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
@@ -483,19 +484,19 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
         }
     }
 
-    private void handleSessionActivity(ActorContext context, SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto subscriptionInfo) {
-        UUID sessionId = getSessionId(sessionInfo);
-        SessionInfoMetaData sessionMD = sessions.get(sessionId);
-        if (sessionMD != null) {
-            sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
-            sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
-            sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription());
-            if (subscriptionInfo.getAttributeSubscription()) {
-                attributeSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
-            }
-            if (subscriptionInfo.getRpcSubscription()) {
-                rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
-            }
+    private void handleSessionActivity(ActorContext context, SessionInfoProto sessionInfoProto, TransportProtos.SubscriptionInfoProto subscriptionInfo) {
+        UUID sessionId = getSessionId(sessionInfoProto);
+        SessionInfoMetaData sessionMD = sessions.computeIfAbsent(sessionId,
+                id -> new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfoProto.getNodeId()), 0L));
+
+        sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
+        sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
+        sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription());
+        if (subscriptionInfo.getAttributeSubscription()) {
+            attributeSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
+        }
+        if (subscriptionInfo.getRpcSubscription()) {
+            rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
         }
         dumpSessions();
     }
@@ -629,8 +630,14 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
 
     private void restoreSessions() {
         log.debug("[{}] Restoring sessions from cache", deviceId);
-        TransportProtos.DeviceSessionsCacheEntry sessionsDump = systemContext.getDeviceSessionCacheService().get(deviceId);
-        if (sessionsDump.getSerializedSize() == 0) {
+        TransportProtos.DeviceSessionsCacheEntry sessionsDump = null;
+        try {
+            sessionsDump = TransportProtos.DeviceSessionsCacheEntry.parseFrom(systemContext.getDeviceSessionCacheService().get(deviceId));
+        } catch (InvalidProtocolBufferException e) {
+            log.warn("[{}] Failed to decode device sessions from cache", deviceId);
+            return;
+        }
+        if (sessionsDump.getSessionsCount() == 0) {
             log.debug("[{}] No session information found", deviceId);
             return;
         }
@@ -677,7 +684,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
         });
         systemContext.getDeviceSessionCacheService()
                 .put(deviceId, TransportProtos.DeviceSessionsCacheEntry.newBuilder()
-                        .addAllSessions(sessionsList).build());
+                        .addAllSessions(sessionsList).build().toByteArray());
     }
 
     void initSessionTimeout(ActorContext context) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index 85b8943..69061ba 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -22,11 +22,14 @@ import akka.actor.Terminated;
 import com.google.protobuf.ByteString;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
 import org.springframework.stereotype.Service;
 import org.thingsboard.rule.engine.api.msg.DeviceCredentialsUpdateNotificationMsg;
 import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.app.AppActor;
+import org.thingsboard.server.actors.app.AppInitMsg;
 import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
 import org.thingsboard.server.actors.rpc.RpcManagerActor;
 import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
@@ -54,6 +57,12 @@ import scala.concurrent.duration.Duration;
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_ACTOR_MESSAGE;
 
 @Service
@@ -86,6 +95,8 @@ public class DefaultActorService implements ActorService {
 
     private ActorRef rpcManagerActor;
 
+    private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
     @PostConstruct
     public void initActorSystem() {
         log.info("Initializing Actor system. {}", actorContext.getRuleChainService());
@@ -106,6 +117,12 @@ public class DefaultActorService implements ActorService {
         log.info("Actor system initialized.");
     }
 
+    @EventListener(ApplicationReadyEvent.class)
+    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
+        log.info("Received application ready event. Sending application init message to actor system");
+        appActor.tell(new AppInitMsg(), ActorRef.noSender());
+    }
+
     @PreDestroy
     public void stopActorSystem() {
         Future<Terminated> status = system.terminate();
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
index f5321e5..e0bdd12 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
@@ -38,6 +38,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.context.event.ApplicationReadyEvent;
 import org.springframework.context.ApplicationListener;
 import org.springframework.context.annotation.Lazy;
+import org.springframework.context.event.EventListener;
 import org.springframework.stereotype.Service;
 import org.springframework.util.Assert;
 import org.thingsboard.server.actors.service.ActorService;
@@ -61,7 +62,7 @@ import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.
 @Service
 @ConditionalOnProperty(prefix = "zk", value = "enabled", havingValue = "true", matchIfMissing = false)
 @Slf4j
-public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheListener, ApplicationListener<ApplicationReadyEvent> {
+public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheListener {
 
     @Value("${zk.url}")
     private String zkUrl;
@@ -232,8 +233,9 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
                 .collect(Collectors.toList());
     }
 
-    @Override
+    @EventListener(ApplicationReadyEvent.class)
     public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
+        log.info("Received application ready event. Starting current ZK node.");
         if (stopped) {
             log.debug("Ignoring application ready event. Service is stopped.");
             return;
diff --git a/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java b/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java
index 6201dab..cc8ecee 100644
--- a/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java
+++ b/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java
@@ -22,7 +22,6 @@ import org.springframework.stereotype.Service;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry;
 
-import java.util.ArrayList;
 import java.util.Collections;
 
 import static org.thingsboard.server.common.data.CacheConstants.SESSIONS_CACHE;
@@ -35,16 +34,17 @@ import static org.thingsboard.server.common.data.CacheConstants.SESSIONS_CACHE;
 public class DefaultDeviceSessionCacheService implements DeviceSessionCacheService {
 
     @Override
-    @Cacheable(cacheNames = SESSIONS_CACHE, key = "#deviceId")
-    public DeviceSessionsCacheEntry get(DeviceId deviceId) {
+    @Cacheable(cacheNames = SESSIONS_CACHE, key = "#deviceId.toString()")
+    public byte[] get(DeviceId deviceId) {
         log.debug("[{}] Fetching session data from cache", deviceId);
-        return DeviceSessionsCacheEntry.newBuilder().addAllSessions(Collections.emptyList()).build();
+        return DeviceSessionsCacheEntry.newBuilder().addAllSessions(Collections.emptyList()).build().toByteArray();
     }
 
     @Override
-    @CachePut(cacheNames = SESSIONS_CACHE, key = "#deviceId")
-    public DeviceSessionsCacheEntry put(DeviceId deviceId, DeviceSessionsCacheEntry sessions) {
-        log.debug("[{}] Pushing session data from cache: {}", deviceId, sessions);
+    @CachePut(cacheNames = SESSIONS_CACHE, key = "#deviceId.toString()")
+    public byte[] put(DeviceId deviceId, byte[] sessions) {
+        log.debug("[{}] Pushing session data to cache: {}", deviceId, sessions);
         return sessions;
     }
+
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java b/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java
index 0a2e6a5..a9a1702 100644
--- a/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java
+++ b/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java
@@ -23,8 +23,8 @@ import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheE
  */
 public interface DeviceSessionCacheService {
 
-    DeviceSessionsCacheEntry get(DeviceId deviceId);
+    byte[] get(DeviceId deviceId);
 
-    DeviceSessionsCacheEntry put(DeviceId deviceId, DeviceSessionsCacheEntry sessions);
+    byte[] put(DeviceId deviceId, byte[] sessions);
 
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
index a8ab2cd..45f8433 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
@@ -29,6 +29,9 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.context.event.EventListener;
 import org.springframework.stereotype.Service;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.service.ActorService;
@@ -127,7 +130,11 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
 
         ruleEngineConsumer = ruleEngineConsumerBuilder.build();
         ruleEngineConsumer.subscribe();
+    }
 
+    @EventListener(ApplicationReadyEvent.class)
+    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
+        log.info("Received application ready event. Starting polling for events.");
         LocalBucketBuilder builder = Bucket4j.builder();
         builder.addLimit(Bandwidth.simple(pollRecordsPerSecond, Duration.ofSeconds(1)));
         builder.addLimit(Bandwidth.simple(pollRecordsPerMinute, Duration.ofMinutes(1)));
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
index 6d422f4..88033aa 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
@@ -19,6 +19,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
 import org.springframework.stereotype.Component;
 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
@@ -93,6 +95,11 @@ public class RemoteTransportApiService {
         builder.executor(transportCallbackExecutor);
         builder.handler(transportApiService);
         transportApiTemplate = builder.build();
+    }
+
+    @EventListener(ApplicationReadyEvent.class)
+    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
+        log.info("Received application ready event. Starting polling for events.");
         transportApiTemplate.init();
     }
 
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index 44a98d6..a63ef6b 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -28,6 +28,8 @@ public enum MsgType {
      */
     CLUSTER_EVENT_MSG,
 
+    APP_INIT_MSG,
+
     /**
      * All messages, could be send  to cluster
     */
diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index a178bdf..fa6e557 100644
--- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -141,7 +141,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
                 processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
                 break;
             case PINGREQ:
-                if (checkConnected(ctx)) {
+                if (checkConnected(ctx, msg)) {
                     ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
                     transportService.reportActivity(sessionInfo);
                     if (gatewaySessionHandler != null) {
@@ -150,7 +150,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
                 }
                 break;
             case DISCONNECT:
-                if (checkConnected(ctx)) {
+                if (checkConnected(ctx, msg)) {
                     processDisconnect(ctx);
                 }
                 break;
@@ -161,7 +161,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
     }
 
     private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
-        if (!checkConnected(ctx)) {
+        if (!checkConnected(ctx, mqttMsg)) {
             return;
         }
         String topicName = mqttMsg.variableHeader().topicName();
@@ -248,7 +248,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
     }
 
     private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
-        if (!checkConnected(ctx)) {
+        if (!checkConnected(ctx, mqttMsg)) {
             return;
         }
         log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
@@ -293,7 +293,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
     }
 
     private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
-        if (!checkConnected(ctx)) {
+        if (!checkConnected(ctx, mqttMsg)) {
             return;
         }
         log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
@@ -444,11 +444,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
         return new MqttPubAckMessage(mqttFixedHeader, mqttMsgIdVariableHeader);
     }
 
-    private boolean checkConnected(ChannelHandlerContext ctx) {
+    private boolean checkConnected(ChannelHandlerContext ctx, MqttMessage msg) {
         if (deviceSessionCtx.isConnected()) {
             return true;
         } else {
-            log.info("[{}] Closing current session due to invalid msg order [{}][{}]", sessionId);
+            log.info("[{}] Closing current session due to invalid msg order: {}", sessionId, msg);
             ctx.close();
             return false;
         }
@@ -496,6 +496,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
             transportService.registerAsyncSession(sessionInfo, this);
             checkGatewaySession();
             ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
+            log.info("[{}] Client connected!", sessionId);
         }
     }