diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index f150574..4d08fe0 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -75,6 +75,10 @@ mqtt:
bind_port: "${MQTT_BIND_PORT:1883}"
adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}"
timeout: "${MQTT_TIMEOUT:10000}"
+ netty:
+ leak_detector_level: "${NETTY_LEASK_DETECTOR_LVL:DISABLED}"
+ boss_group_thread_count: "${NETTY_BOSS_GROUP_THREADS:1}"
+ worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}"
# Uncomment the following lines to enable ssl for MQTT
# ssl:
# key_store: keystore/mqttserver.jks
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
index 1543400..8710809 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
@@ -20,8 +20,7 @@ import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.logging.LogLevel;
-import io.netty.handler.logging.LoggingHandler;
+import io.netty.util.ResourceLeakDetector;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@@ -67,6 +66,14 @@ public class MqttTransportService {
@Value("${mqtt.adaptor}")
private String adaptorName;
+ @Value("${mqtt.netty.leak_detector_level}")
+ private String leakDetectorLevel;
+ @Value("${mqtt.netty.boss_group_thread_count}")
+ private Integer bossGroupThreadCount;
+ @Value("${mqtt.netty.worker_group_thread_count}")
+ private Integer workerGroupThreadCount;
+
+
private MqttTransportAdaptor adaptor;
private Channel serverChannel;
@@ -75,17 +82,19 @@ public class MqttTransportService {
@PostConstruct
public void init() throws Exception {
+ log.info("Setting resource leak detector level to {}", leakDetectorLevel);
+ ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
+
log.info("Starting MQTT transport...");
log.info("Lookup MQTT transport adaptor {}", adaptorName);
this.adaptor = (MqttTransportAdaptor) appContext.getBean(adaptorName);
log.info("Starting MQTT transport server");
- bossGroup = new NioEventLoopGroup(1);
- workerGroup = new NioEventLoopGroup();
+ bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
+ workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
- .handler(new LoggingHandler(LogLevel.TRACE))
.childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, adaptor, sslHandlerProvider));
serverChannel = b.bind(host, port).sync().channel();