thingsboard-aplcache

Details

diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java
index a59d83b..bcb8bd1 100644
--- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java
@@ -40,6 +40,8 @@ public final class MqttClientConfig {
     private Class<? extends Channel> channelClass = NioSocketChannel.class;
 
     private boolean reconnect = true;
+    private long reconnectDelay = 1L;
+    private int maxBytesInMessage = 8092;
 
     public MqttClientConfig() {
         this(null);
@@ -146,4 +148,38 @@ public final class MqttClientConfig {
     public void setReconnect(boolean reconnect) {
         this.reconnect = reconnect;
     }
+
+    public long getReconnectDelay() {
+        return reconnectDelay;
+    }
+
+    /**
+     * Sets the reconnect delay in seconds. Defaults to 1 second.
+     * @param reconnectDelay
+     * @throws IllegalArgumentException if reconnectDelay is smaller than 1.
+     */
+    public void setReconnectDelay(long reconnectDelay) {
+        if (reconnectDelay <= 0) {
+            throw new IllegalArgumentException("reconnectDelay must be > 0");
+        }
+        this.reconnectDelay = reconnectDelay;
+    }
+
+    public int getMaxBytesInMessage() {
+        return maxBytesInMessage;
+    }
+
+    /**
+     * Sets the maximum number of bytes in the message for the {@link io.netty.handler.codec.mqtt.MqttDecoder}.
+     * Default value is 8092 as specified by Netty. The absolute maximum size is 256MB as set by the MQTT spec.
+     *
+     * @param maxBytesInMessage
+     * @throws IllegalArgumentException if maxBytesInMessage is smaller than 1 or greater than 256_000_000.
+     */
+    public void setMaxBytesInMessage(int maxBytesInMessage) {
+        if (maxBytesInMessage <= 0 || maxBytesInMessage > 256_000_000) {
+            throw new IllegalArgumentException("maxBytesInMessage must be > 0 or < 256_000_000");
+        }
+        this.maxBytesInMessage = maxBytesInMessage;
+    }
 }
diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java
index a5df846..b9460b3 100644
--- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java
@@ -155,7 +155,7 @@ final class MqttClientImpl implements MqttClient {
             if (reconnect) {
                 this.reconnect = true;
             }
-            eventLoop.schedule((Runnable) () -> connect(host, port, reconnect), 1L, TimeUnit.SECONDS);
+            eventLoop.schedule((Runnable) () -> connect(host, port, reconnect), clientConfig.getReconnectDelay(), TimeUnit.SECONDS);
         }
     }
 
@@ -512,7 +512,7 @@ final class MqttClientImpl implements MqttClient {
                 ch.pipeline().addLast(sslContext.newHandler(ch.alloc(), host, port));
             }
 
-            ch.pipeline().addLast("mqttDecoder", new MqttDecoder());
+            ch.pipeline().addLast("mqttDecoder", new MqttDecoder(clientConfig.getMaxBytesInMessage()));
             ch.pipeline().addLast("mqttEncoder", MqttEncoder.INSTANCE);
             ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(MqttClientImpl.this.clientConfig.getTimeoutSeconds(), MqttClientImpl.this.clientConfig.getTimeoutSeconds(), 0));
             ch.pipeline().addLast("mqttPingHandler", new MqttPingHandler(MqttClientImpl.this.clientConfig.getTimeoutSeconds()));