thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 39(+23 -16)
application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java 6(+4 -2)
application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java 14(+7 -7)
application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java 4(+2 -2)
application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java 7(+7 -0)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index 714ab57..9f27641 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -39,6 +39,7 @@ import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
+import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
@@ -58,6 +59,7 @@ public class AppActor extends RuleChainManagerActor {
private static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID);
private final TenantService tenantService;
private final BiMap<TenantId, ActorRef> tenantActors;
+ private boolean ruleChainsInitialized;
private AppActor(ActorSystemContext systemContext) {
super(systemContext, new SystemRuleChainManager(systemContext));
@@ -72,26 +74,20 @@ public class AppActor extends RuleChainManagerActor {
@Override
public void preStart() {
- log.info("Starting main system actor.");
- try {
- initRuleChains();
- if (systemContext.isTenantComponentsInitEnabled()) {
- PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
- for (Tenant tenant : tenantIterator) {
- log.debug("[{}] Creating tenant actor", tenant.getId());
- getOrCreateTenantActor(tenant.getId());
- log.debug("Tenant actor created.");
- }
- }
- log.info("Main system actor started.");
- } catch (Exception e) {
- log.warn("Unknown failure", e);
- }
}
@Override
protected boolean process(TbActorMsg msg) {
+ if (!ruleChainsInitialized) {
+ initRuleChainsAndTenantActors();
+ ruleChainsInitialized = true;
+ if (msg.getMsgType() != MsgType.APP_INIT_MSG) {
+ log.warn("Rule Chains initialized by unexpected message: {}", msg);
+ }
+ }
switch (msg.getMsgType()) {
+ case APP_INIT_MSG:
+ break;
case SEND_TO_CLUSTER_MSG:
onPossibleClusterMsg((SendToClusterMsg) msg);
break;
@@ -119,6 +115,24 @@ public class AppActor extends RuleChainManagerActor {
return true;
}
+ private void initRuleChainsAndTenantActors() {
+ log.info("Starting main system actor.");
+ try {
+ initRuleChains();
+ if (systemContext.isTenantComponentsInitEnabled()) {
+ PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
+ for (Tenant tenant : tenantIterator) {
+ log.debug("[{}] Creating tenant actor", tenant.getId());
+ getOrCreateTenantActor(tenant.getId());
+ log.debug("Tenant actor created.");
+ }
+ }
+ log.info("Main system actor started.");
+ } catch (Exception e) {
+ log.warn("Unknown failure", e);
+ }
+ }
+
private void onPossibleClusterMsg(SendToClusterMsg msg) {
Optional<ServerAddress> address = systemContext.getRoutingService().resolveById(msg.getEntityId());
if (address.isPresent()) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppInitMsg.java b/application/src/main/java/org/thingsboard/server/actors/app/AppInitMsg.java
new file mode 100644
index 0000000..fd8c178
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppInitMsg.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.app;
+
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
+
+public class AppInitMsg implements TbActorMsg {
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.APP_INIT_MSG;
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 41d1ea0..833e5ea 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.RpcError;
import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
@@ -483,19 +484,19 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
}
}
- private void handleSessionActivity(ActorContext context, SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto subscriptionInfo) {
- UUID sessionId = getSessionId(sessionInfo);
- SessionInfoMetaData sessionMD = sessions.get(sessionId);
- if (sessionMD != null) {
- sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
- sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
- sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription());
- if (subscriptionInfo.getAttributeSubscription()) {
- attributeSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
- }
- if (subscriptionInfo.getRpcSubscription()) {
- rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
- }
+ private void handleSessionActivity(ActorContext context, SessionInfoProto sessionInfoProto, TransportProtos.SubscriptionInfoProto subscriptionInfo) {
+ UUID sessionId = getSessionId(sessionInfoProto);
+ SessionInfoMetaData sessionMD = sessions.computeIfAbsent(sessionId,
+ id -> new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfoProto.getNodeId()), 0L));
+
+ sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
+ sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
+ sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription());
+ if (subscriptionInfo.getAttributeSubscription()) {
+ attributeSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
+ }
+ if (subscriptionInfo.getRpcSubscription()) {
+ rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
}
dumpSessions();
}
@@ -629,8 +630,14 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
private void restoreSessions() {
log.debug("[{}] Restoring sessions from cache", deviceId);
- TransportProtos.DeviceSessionsCacheEntry sessionsDump = systemContext.getDeviceSessionCacheService().get(deviceId);
- if (sessionsDump.getSerializedSize() == 0) {
+ TransportProtos.DeviceSessionsCacheEntry sessionsDump = null;
+ try {
+ sessionsDump = TransportProtos.DeviceSessionsCacheEntry.parseFrom(systemContext.getDeviceSessionCacheService().get(deviceId));
+ } catch (InvalidProtocolBufferException e) {
+ log.warn("[{}] Failed to decode device sessions from cache", deviceId);
+ return;
+ }
+ if (sessionsDump.getSessionsCount() == 0) {
log.debug("[{}] No session information found", deviceId);
return;
}
@@ -677,7 +684,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
});
systemContext.getDeviceSessionCacheService()
.put(deviceId, TransportProtos.DeviceSessionsCacheEntry.newBuilder()
- .addAllSessions(sessionsList).build());
+ .addAllSessions(sessionsList).build().toByteArray());
}
void initSessionTimeout(ActorContext context) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index 85b8943..69061ba 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -22,11 +22,14 @@ import akka.actor.Terminated;
import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.msg.DeviceCredentialsUpdateNotificationMsg;
import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.app.AppActor;
+import org.thingsboard.server.actors.app.AppInitMsg;
import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
import org.thingsboard.server.actors.rpc.RpcManagerActor;
import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
@@ -54,6 +57,12 @@ import scala.concurrent.duration.Duration;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_ACTOR_MESSAGE;
@Service
@@ -86,6 +95,8 @@ public class DefaultActorService implements ActorService {
private ActorRef rpcManagerActor;
+ private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
@PostConstruct
public void initActorSystem() {
log.info("Initializing Actor system. {}", actorContext.getRuleChainService());
@@ -106,6 +117,12 @@ public class DefaultActorService implements ActorService {
log.info("Actor system initialized.");
}
+ @EventListener(ApplicationReadyEvent.class)
+ public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
+ log.info("Received application ready event. Sending application init message to actor system");
+ appActor.tell(new AppInitMsg(), ActorRef.noSender());
+ }
+
@PreDestroy
public void stopActorSystem() {
Future<Terminated> status = system.terminate();
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
index f5321e5..e0bdd12 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
@@ -38,6 +38,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Lazy;
+import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.thingsboard.server.actors.service.ActorService;
@@ -61,7 +62,7 @@ import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.
@Service
@ConditionalOnProperty(prefix = "zk", value = "enabled", havingValue = "true", matchIfMissing = false)
@Slf4j
-public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheListener, ApplicationListener<ApplicationReadyEvent> {
+public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheListener {
@Value("${zk.url}")
private String zkUrl;
@@ -232,8 +233,9 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
.collect(Collectors.toList());
}
- @Override
+ @EventListener(ApplicationReadyEvent.class)
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
+ log.info("Received application ready event. Starting current ZK node.");
if (stopped) {
log.debug("Ignoring application ready event. Service is stopped.");
return;
diff --git a/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java b/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java
index 6201dab..cc8ecee 100644
--- a/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java
+++ b/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java
@@ -22,7 +22,6 @@ import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry;
-import java.util.ArrayList;
import java.util.Collections;
import static org.thingsboard.server.common.data.CacheConstants.SESSIONS_CACHE;
@@ -35,16 +34,17 @@ import static org.thingsboard.server.common.data.CacheConstants.SESSIONS_CACHE;
public class DefaultDeviceSessionCacheService implements DeviceSessionCacheService {
@Override
- @Cacheable(cacheNames = SESSIONS_CACHE, key = "#deviceId")
- public DeviceSessionsCacheEntry get(DeviceId deviceId) {
+ @Cacheable(cacheNames = SESSIONS_CACHE, key = "#deviceId.toString()")
+ public byte[] get(DeviceId deviceId) {
log.debug("[{}] Fetching session data from cache", deviceId);
- return DeviceSessionsCacheEntry.newBuilder().addAllSessions(Collections.emptyList()).build();
+ return DeviceSessionsCacheEntry.newBuilder().addAllSessions(Collections.emptyList()).build().toByteArray();
}
@Override
- @CachePut(cacheNames = SESSIONS_CACHE, key = "#deviceId")
- public DeviceSessionsCacheEntry put(DeviceId deviceId, DeviceSessionsCacheEntry sessions) {
- log.debug("[{}] Pushing session data from cache: {}", deviceId, sessions);
+ @CachePut(cacheNames = SESSIONS_CACHE, key = "#deviceId.toString()")
+ public byte[] put(DeviceId deviceId, byte[] sessions) {
+ log.debug("[{}] Pushing session data to cache: {}", deviceId, sessions);
return sessions;
}
+
}
diff --git a/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java b/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java
index 0a2e6a5..a9a1702 100644
--- a/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java
+++ b/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java
@@ -23,8 +23,8 @@ import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheE
*/
public interface DeviceSessionCacheService {
- DeviceSessionsCacheEntry get(DeviceId deviceId);
+ byte[] get(DeviceId deviceId);
- DeviceSessionsCacheEntry put(DeviceId deviceId, DeviceSessionsCacheEntry sessions);
+ byte[] put(DeviceId deviceId, byte[] sessions);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
index a8ab2cd..45f8433 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
@@ -29,6 +29,9 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ActorService;
@@ -127,7 +130,11 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
ruleEngineConsumer = ruleEngineConsumerBuilder.build();
ruleEngineConsumer.subscribe();
+ }
+ @EventListener(ApplicationReadyEvent.class)
+ public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
+ log.info("Received application ready event. Starting polling for events.");
LocalBucketBuilder builder = Bucket4j.builder();
builder.addLimit(Bandwidth.simple(pollRecordsPerSecond, Duration.ofSeconds(1)));
builder.addLimit(Bandwidth.simple(pollRecordsPerMinute, Duration.ofMinutes(1)));
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
index 6d422f4..88033aa 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
@@ -19,6 +19,8 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
@@ -93,6 +95,11 @@ public class RemoteTransportApiService {
builder.executor(transportCallbackExecutor);
builder.handler(transportApiService);
transportApiTemplate = builder.build();
+ }
+
+ @EventListener(ApplicationReadyEvent.class)
+ public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
+ log.info("Received application ready event. Starting polling for events.");
transportApiTemplate.init();
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index 44a98d6..a63ef6b 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -28,6 +28,8 @@ public enum MsgType {
*/
CLUSTER_EVENT_MSG,
+ APP_INIT_MSG,
+
/**
* All messages, could be send to cluster
*/
diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index a178bdf..fa6e557 100644
--- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -141,7 +141,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
break;
case PINGREQ:
- if (checkConnected(ctx)) {
+ if (checkConnected(ctx, msg)) {
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
transportService.reportActivity(sessionInfo);
if (gatewaySessionHandler != null) {
@@ -150,7 +150,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
break;
case DISCONNECT:
- if (checkConnected(ctx)) {
+ if (checkConnected(ctx, msg)) {
processDisconnect(ctx);
}
break;
@@ -161,7 +161,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
- if (!checkConnected(ctx)) {
+ if (!checkConnected(ctx, mqttMsg)) {
return;
}
String topicName = mqttMsg.variableHeader().topicName();
@@ -248,7 +248,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
- if (!checkConnected(ctx)) {
+ if (!checkConnected(ctx, mqttMsg)) {
return;
}
log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
@@ -293,7 +293,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
- if (!checkConnected(ctx)) {
+ if (!checkConnected(ctx, mqttMsg)) {
return;
}
log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
@@ -444,11 +444,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
return new MqttPubAckMessage(mqttFixedHeader, mqttMsgIdVariableHeader);
}
- private boolean checkConnected(ChannelHandlerContext ctx) {
+ private boolean checkConnected(ChannelHandlerContext ctx, MqttMessage msg) {
if (deviceSessionCtx.isConnected()) {
return true;
} else {
- log.info("[{}] Closing current session due to invalid msg order [{}][{}]", sessionId);
+ log.info("[{}] Closing current session due to invalid msg order: {}", sessionId, msg);
ctx.close();
return false;
}
@@ -496,6 +496,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
transportService.registerAsyncSession(sessionInfo, this);
checkGatewaySession();
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
+ log.info("[{}] Client connected!", sessionId);
}
}