thingsboard-aplcache

Add Netty MQTT Client module.

5/14/2018 5:20:24 AM

Changes

netty-mqtt/pom.xml 99(+99 -0)

pom.xml 17(+11 -6)

Details

diff --git a/netty-mqtt/.gitignore b/netty-mqtt/.gitignore
new file mode 100644
index 0000000..4d2302b
--- /dev/null
+++ b/netty-mqtt/.gitignore
@@ -0,0 +1,7 @@
+.idea/
+*.ipr
+*.iws
+*.ids
+*.iml
+logs
+target

netty-mqtt/pom.xml 99(+99 -0)

diff --git a/netty-mqtt/pom.xml b/netty-mqtt/pom.xml
new file mode 100644
index 0000000..4e9c84f
--- /dev/null
+++ b/netty-mqtt/pom.xml
@@ -0,0 +1,99 @@
+<!--
+
+    Copyright © 2016-2018 The Thingsboard Authors
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.thingsboard</groupId>
+        <version>1.4.1-SNAPSHOT</version>
+        <artifactId>thingsboard</artifactId>
+    </parent>
+    <groupId>org.thingsboard</groupId>
+    <artifactId>netty-mqtt</artifactId>
+    <version>1.4.1-SNAPSHOT</version>
+    <packaging>jar</packaging>
+
+    <name>Netty MQTT Client</name>
+    <url>https://thingsboard.io</url>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <main.dir>${basedir}/..</main.dir>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-codec-mqtt</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-handler</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+            <version>3.0.1</version>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+    </dependencies>
+
+    <distributionManagement>
+        <repository>
+            <id>jk-5-maven</id>
+            <name>jk-5's maven server</name>
+            <url>sftp://10.2.1.2/opt/maven</url>
+        </repository>
+    </distributionManagement>
+
+    <build>
+        <extensions>
+            <extension>
+                <groupId>org.apache.maven.wagon</groupId>
+                <artifactId>wagon-ssh</artifactId>
+                <version>2.6</version>
+            </extension>
+        </extensions>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <archive>
+                        <manifest>
+                            <addDefaultImplementationEntries>true</addDefaultImplementationEntries>
+                        </manifest>
+                    </archive>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/ChannelClosedException.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/ChannelClosedException.java
new file mode 100644
index 0000000..a76b082
--- /dev/null
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/ChannelClosedException.java
@@ -0,0 +1,43 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.mqtt;
+
+/**
+ * Created by Valerii Sosliuk on 12/26/2017.
+ */
+public class ChannelClosedException extends RuntimeException {
+
+    private static final long serialVersionUID = 6266638352424706909L;
+
+    public ChannelClosedException() {
+    }
+
+    public ChannelClosedException(String message) {
+        super(message);
+    }
+
+    public ChannelClosedException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ChannelClosedException(Throwable cause) {
+        super(cause);
+    }
+
+    public ChannelClosedException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}
diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java
new file mode 100644
index 0000000..ef5e7a5
--- /dev/null
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java
@@ -0,0 +1,269 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.mqtt;
+
+import com.google.common.collect.ImmutableSet;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.mqtt.*;
+import io.netty.util.CharsetUtil;
+import io.netty.util.concurrent.Promise;
+
+final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage> {
+
+    private final MqttClientImpl client;
+    private final Promise<MqttConnectResult> connectFuture;
+
+    MqttChannelHandler(MqttClientImpl client, Promise<MqttConnectResult> connectFuture) {
+        this.client = client;
+        this.connectFuture = connectFuture;
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
+        switch (msg.fixedHeader().messageType()) {
+            case CONNACK:
+                handleConack(ctx.channel(), (MqttConnAckMessage) msg);
+                break;
+            case SUBACK:
+                handleSubAck((MqttSubAckMessage) msg);
+                break;
+            case PUBLISH:
+                handlePublish(ctx.channel(), (MqttPublishMessage) msg);
+                break;
+            case UNSUBACK:
+                handleUnsuback((MqttUnsubAckMessage) msg);
+                break;
+            case PUBACK:
+                handlePuback((MqttPubAckMessage) msg);
+                break;
+            case PUBREC:
+                handlePubrec(ctx.channel(), msg);
+                break;
+            case PUBREL:
+                handlePubrel(ctx.channel(), msg);
+                break;
+            case PUBCOMP:
+                handlePubcomp(msg);
+                break;
+        }
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        super.channelActive(ctx);
+
+        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
+        MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
+                this.client.getClientConfig().getProtocolVersion().protocolName(),  // Protocol Name
+                this.client.getClientConfig().getProtocolVersion().protocolLevel(), // Protocol Level
+                this.client.getClientConfig().getUsername() != null,                // Has Username
+                this.client.getClientConfig().getPassword() != null,                // Has Password
+                this.client.getClientConfig().getLastWill() != null                 // Will Retain
+                        && this.client.getClientConfig().getLastWill().isRetain(),
+                this.client.getClientConfig().getLastWill() != null                 // Will QOS
+                        ? this.client.getClientConfig().getLastWill().getQos().value()
+                        : 0,
+                this.client.getClientConfig().getLastWill() != null,                // Has Will
+                this.client.getClientConfig().isCleanSession(),                     // Clean Session
+                this.client.getClientConfig().getTimeoutSeconds()                   // Timeout
+        );
+        MqttConnectPayload payload = new MqttConnectPayload(
+                this.client.getClientConfig().getClientId(),
+                this.client.getClientConfig().getLastWill() != null ? this.client.getClientConfig().getLastWill().getTopic() : null,
+                this.client.getClientConfig().getLastWill() != null ? this.client.getClientConfig().getLastWill().getMessage().getBytes(CharsetUtil.UTF_8) : null,
+                this.client.getClientConfig().getUsername(),
+                this.client.getClientConfig().getPassword() != null ? this.client.getClientConfig().getPassword().getBytes(CharsetUtil.UTF_8) : null
+        );
+        ctx.channel().writeAndFlush(new MqttConnectMessage(fixedHeader, variableHeader, payload));
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        super.channelInactive(ctx);
+    }
+
+    private void invokeHandlersForIncomingPublish(MqttPublishMessage message) {
+        for (MqttSubscribtion subscribtion : ImmutableSet.copyOf(this.client.getSubscriptions().values())) {
+            if (subscribtion.matches(message.variableHeader().topicName())) {
+                if (subscribtion.isOnce() && subscribtion.isCalled()) {
+                    continue;
+                }
+                message.payload().markReaderIndex();
+                subscribtion.setCalled(true);
+                subscribtion.getHandler().onMessage(message.variableHeader().topicName(), message.payload());
+                if (subscribtion.isOnce()) {
+                    this.client.off(subscribtion.getTopic(), subscribtion.getHandler());
+                }
+                message.payload().resetReaderIndex();
+            }
+        }
+        /*Set<MqttSubscribtion> subscribtions = ImmutableSet.copyOf(this.client.getSubscriptions().get(message.variableHeader().topicName()));
+        for (MqttSubscribtion subscribtion : subscribtions) {
+            if(subscribtion.isOnce() && subscribtion.isCalled()){
+                continue;
+            }
+            message.payload().markReaderIndex();
+            subscribtion.setCalled(true);
+            subscribtion.getHandler().onMessage(message.variableHeader().topicName(), message.payload());
+            if(subscribtion.isOnce()){
+                this.client.off(subscribtion.getTopic(), subscribtion.getHandler());
+            }
+            message.payload().resetReaderIndex();
+        }*/
+        message.payload().release();
+    }
+
+    private void handleConack(Channel channel, MqttConnAckMessage message) {
+        switch (message.variableHeader().connectReturnCode()) {
+            case CONNECTION_ACCEPTED:
+                this.connectFuture.setSuccess(new MqttConnectResult(true, MqttConnectReturnCode.CONNECTION_ACCEPTED, channel.closeFuture()));
+
+                this.client.getPendingSubscribtions().entrySet().stream().filter((e) -> !e.getValue().isSent()).forEach((e) -> {
+                    channel.write(e.getValue().getSubscribeMessage());
+                    e.getValue().setSent(true);
+                });
+
+                this.client.getPendingPublishes().forEach((id, publish) -> {
+                    if (publish.isSent()) return;
+                    channel.write(publish.getMessage());
+                    publish.setSent(true);
+                    if (publish.getQos() == MqttQoS.AT_MOST_ONCE) {
+                        publish.getFuture().setSuccess(null); //We don't get an ACK for QOS 0
+                        this.client.getPendingPublishes().remove(publish.getMessageId());
+                    }
+                });
+                channel.flush();
+                break;
+
+            case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD:
+            case CONNECTION_REFUSED_IDENTIFIER_REJECTED:
+            case CONNECTION_REFUSED_NOT_AUTHORIZED:
+            case CONNECTION_REFUSED_SERVER_UNAVAILABLE:
+            case CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION:
+                this.connectFuture.setSuccess(new MqttConnectResult(false, message.variableHeader().connectReturnCode(), channel.closeFuture()));
+                channel.close();
+                // Don't start reconnect logic here
+                break;
+        }
+    }
+
+    private void handleSubAck(MqttSubAckMessage message) {
+        MqttPendingSubscribtion pendingSubscription = this.client.getPendingSubscribtions().remove(message.variableHeader().messageId());
+        if (pendingSubscription == null) {
+            return;
+        }
+        pendingSubscription.onSubackReceived();
+        for (MqttPendingSubscribtion.MqttPendingHandler handler : pendingSubscription.getHandlers()) {
+            MqttSubscribtion subscribtion = new MqttSubscribtion(pendingSubscription.getTopic(), handler.getHandler(), handler.isOnce());
+            this.client.getSubscriptions().put(pendingSubscription.getTopic(), subscribtion);
+            this.client.getHandlerToSubscribtion().put(handler.getHandler(), subscribtion);
+        }
+        this.client.getPendingSubscribeTopics().remove(pendingSubscription.getTopic());
+
+        this.client.getServerSubscribtions().add(pendingSubscription.getTopic());
+
+        if (!pendingSubscription.getFuture().isDone()) {
+            pendingSubscription.getFuture().setSuccess(null);
+        }
+    }
+
+    private void handlePublish(Channel channel, MqttPublishMessage message) {
+        switch (message.fixedHeader().qosLevel()) {
+            case AT_MOST_ONCE:
+                invokeHandlersForIncomingPublish(message);
+                break;
+
+            case AT_LEAST_ONCE:
+                invokeHandlersForIncomingPublish(message);
+                if (message.variableHeader().messageId() != -1) {
+                    MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
+                    MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().messageId());
+                    channel.writeAndFlush(new MqttPubAckMessage(fixedHeader, variableHeader));
+                }
+                break;
+
+            case EXACTLY_ONCE:
+                if (message.variableHeader().messageId() != -1) {
+                    MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
+                    MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().messageId());
+                    MqttMessage pubrecMessage = new MqttMessage(fixedHeader, variableHeader);
+
+                    MqttIncomingQos2Publish incomingQos2Publish = new MqttIncomingQos2Publish(message, pubrecMessage);
+                    this.client.getQos2PendingIncomingPublishes().put(message.variableHeader().messageId(), incomingQos2Publish);
+                    message.payload().retain();
+                    incomingQos2Publish.startPubrecRetransmitTimer(this.client.getEventLoop().next(), this.client::sendAndFlushPacket);
+
+                    channel.writeAndFlush(pubrecMessage);
+                }
+                break;
+        }
+    }
+
+    private void handleUnsuback(MqttUnsubAckMessage message) {
+        MqttPendingUnsubscribtion unsubscribtion = this.client.getPendingServerUnsubscribes().get(message.variableHeader().messageId());
+        if (unsubscribtion == null) {
+            return;
+        }
+        unsubscribtion.onUnsubackReceived();
+        this.client.getServerSubscribtions().remove(unsubscribtion.getTopic());
+        unsubscribtion.getFuture().setSuccess(null);
+        this.client.getPendingServerUnsubscribes().remove(message.variableHeader().messageId());
+    }
+
+    private void handlePuback(MqttPubAckMessage message) {
+        MqttPendingPublish pendingPublish = this.client.getPendingPublishes().get(message.variableHeader().messageId());
+        pendingPublish.getFuture().setSuccess(null);
+        pendingPublish.onPubackReceived();
+        this.client.getPendingPublishes().remove(message.variableHeader().messageId());
+        pendingPublish.getPayload().release();
+    }
+
+    private void handlePubrec(Channel channel, MqttMessage message) {
+        MqttPendingPublish pendingPublish = this.client.getPendingPublishes().get(((MqttMessageIdVariableHeader) message.variableHeader()).messageId());
+        pendingPublish.onPubackReceived();
+
+        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
+        MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
+        MqttMessage pubrelMessage = new MqttMessage(fixedHeader, variableHeader);
+        channel.writeAndFlush(pubrelMessage);
+
+        pendingPublish.setPubrelMessage(pubrelMessage);
+        pendingPublish.startPubrelRetransmissionTimer(this.client.getEventLoop().next(), this.client::sendAndFlushPacket);
+    }
+
+    private void handlePubrel(Channel channel, MqttMessage message) {
+        if (this.client.getQos2PendingIncomingPublishes().containsKey(((MqttMessageIdVariableHeader) message.variableHeader()).messageId())) {
+            MqttIncomingQos2Publish incomingQos2Publish = this.client.getQos2PendingIncomingPublishes().get(((MqttMessageIdVariableHeader) message.variableHeader()).messageId());
+            this.invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish());
+            incomingQos2Publish.onPubrelReceived();
+            this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().messageId());
+        }
+        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0);
+        MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) message.variableHeader()).messageId());
+        channel.writeAndFlush(new MqttMessage(fixedHeader, variableHeader));
+    }
+
+    private void handlePubcomp(MqttMessage message) {
+        MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
+        MqttPendingPublish pendingPublish = this.client.getPendingPublishes().get(variableHeader.messageId());
+        pendingPublish.getFuture().setSuccess(null);
+        this.client.getPendingPublishes().remove(variableHeader.messageId());
+        pendingPublish.getPayload().release();
+        pendingPublish.onPubcompReceived();
+    }
+}
diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java
new file mode 100644
index 0000000..6563525
--- /dev/null
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java
@@ -0,0 +1,205 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.mqtt;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.netty.util.concurrent.Future;
+
+public interface MqttClient {
+
+    /**
+     * Connect to the specified hostname/ip. By default uses port 1883.
+     * If you want to change the port number, see {@link #connect(String, int)}
+     *
+     * @param host The ip address or host to connect to
+     * @return A future which will be completed when the connection is opened and we received an CONNACK
+     */
+    Future<MqttConnectResult> connect(String host);
+
+    /**
+     * Connect to the specified hostname/ip using the specified port
+     *
+     * @param host The ip address or host to connect to
+     * @param port The tcp port to connect to
+     * @return A future which will be completed when the connection is opened and we received an CONNACK
+     */
+    Future<MqttConnectResult> connect(String host, int port);
+
+    /**
+     *
+     * @return boolean value indicating if channel is active
+     */
+    boolean isConnected();
+
+    /**
+     * Attempt reconnect to the host that was attempted with {@link #connect(String, int)} method before
+     *
+     * @return A future which will be completed when the connection is opened and we received an CONNACK
+     * @throws IllegalStateException if no previous {@link #connect(String, int)} calls were attempted
+     */
+    Future<MqttConnectResult> reconnect();
+
+    /**
+     * Retrieve the netty {@link EventLoopGroup} we are using
+     * @return The netty {@link EventLoopGroup} we use for the connection
+     */
+    EventLoopGroup getEventLoop();
+
+    /**
+     * By default we use the netty {@link NioEventLoopGroup}.
+     * If you change the EventLoopGroup to another type, make sure to change the {@link Channel} class using {@link MqttClientConfig#setChannelClass(Class)}
+     * If you want to force the MqttClient to use another {@link EventLoopGroup}, call this function before calling {@link #connect(String, int)}
+     *
+     * @param eventLoop The new eventloop to use
+     */
+    void setEventLoop(EventLoopGroup eventLoop);
+
+    /**
+     * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
+     *
+     * @param topic The topic filter to subscribe to
+     * @param handler The handler to invoke when we receive a message
+     * @return A future which will be completed when the server acknowledges our subscribe request
+     */
+    Future<Void> on(String topic, MqttHandler handler);
+
+    /**
+     * Subscribe on the given topic, with the given qos. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
+     *
+     * @param topic The topic filter to subscribe to
+     * @param handler The handler to invoke when we receive a message
+     * @param qos The qos to request to the server
+     * @return A future which will be completed when the server acknowledges our subscribe request
+     */
+    Future<Void> on(String topic, MqttHandler handler, MqttQoS qos);
+
+    /**
+     * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
+     * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed
+     *
+     * @param topic The topic filter to subscribe to
+     * @param handler The handler to invoke when we receive a message
+     * @return A future which will be completed when the server acknowledges our subscribe request
+     */
+    Future<Void> once(String topic, MqttHandler handler);
+
+    /**
+     * Subscribe on the given topic, with the given qos. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
+     * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed
+     *
+     * @param topic The topic filter to subscribe to
+     * @param handler The handler to invoke when we receive a message
+     * @param qos The qos to request to the server
+     * @return A future which will be completed when the server acknowledges our subscribe request
+     */
+    Future<Void> once(String topic, MqttHandler handler, MqttQoS qos);
+
+    /**
+     * Remove the subscribtion for the given topic and handler
+     * If you want to unsubscribe from all handlers known for this topic, use {@link #off(String)}
+     *
+     * @param topic The topic to unsubscribe for
+     * @param handler The handler to unsubscribe
+     * @return A future which will be completed when the server acknowledges our unsubscribe request
+     */
+    Future<Void> off(String topic, MqttHandler handler);
+
+    /**
+     * Remove all subscribtions for the given topic.
+     * If you want to specify which handler to unsubscribe, use {@link #off(String, MqttHandler)}
+     *
+     * @param topic The topic to unsubscribe for
+     * @return A future which will be completed when the server acknowledges our unsubscribe request
+     */
+    Future<Void> off(String topic);
+
+    /**
+     * Publish a message to the given payload
+     * @param topic The topic to publish to
+     * @param payload The payload to send
+     * @return A future which will be completed when the message is sent out of the MqttClient
+     */
+    Future<Void> publish(String topic, ByteBuf payload);
+
+    /**
+     * Publish a message to the given payload, using the given qos
+     * @param topic The topic to publish to
+     * @param payload The payload to send
+     * @param qos The qos to use while publishing
+     * @return A future which will be completed when the message is delivered to the server
+     */
+    Future<Void> publish(String topic, ByteBuf payload, MqttQoS qos);
+
+    /**
+     * Publish a message to the given payload, using optional retain
+     * @param topic The topic to publish to
+     * @param payload The payload to send
+     * @param retain true if you want to retain the message on the server, false otherwise
+     * @return A future which will be completed when the message is sent out of the MqttClient
+     */
+    Future<Void> publish(String topic, ByteBuf payload, boolean retain);
+
+    /**
+     * Publish a message to the given payload, using the given qos and optional retain
+     * @param topic The topic to publish to
+     * @param payload The payload to send
+     * @param qos The qos to use while publishing
+     * @param retain true if you want to retain the message on the server, false otherwise
+     * @return A future which will be completed when the message is delivered to the server
+     */
+    Future<Void> publish(String topic, ByteBuf payload, MqttQoS qos, boolean retain);
+
+    /**
+     * Retrieve the MqttClient configuration
+     * @return The {@link MqttClientConfig} instance we use
+     */
+    MqttClientConfig getClientConfig();
+
+    /**
+     * Construct the MqttClientImpl with default config
+     */
+    static MqttClient create(){
+        return new MqttClientImpl();
+    }
+
+    /**
+     * Construct the MqttClientImpl with additional config.
+     * This config can also be changed using the {@link #getClientConfig()} function
+     *
+     * @param config The config object to use while looking for settings
+     */
+    static MqttClient create(MqttClientConfig config){
+        return new MqttClientImpl(config);
+    }
+
+
+    /**
+     * Send disconnect and close channel
+     *
+     */
+    void disconnect();
+
+    /**
+     * Sets the {@see #MqttClientCallback} object for this MqttClient
+     * @param callback The callback to be set
+     */
+    void setCallback(MqttClientCallback callback);
+
+}
diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientCallback.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientCallback.java
new file mode 100644
index 0000000..d7f0a08
--- /dev/null
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientCallback.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.mqtt;
+
+/**
+ * Created by Valerii Sosliuk on 12/30/2017.
+ */
+public interface MqttClientCallback {
+
+    /**
+     * This method is called when the connection to the server is lost.
+     *
+     * @param cause the reason behind the loss of connection.
+     */
+    public void connectionLost(Throwable cause);
+}
diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java
new file mode 100644
index 0000000..a59d83b
--- /dev/null
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java
@@ -0,0 +1,149 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.mqtt;
+
+import io.netty.channel.Channel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.mqtt.MqttVersion;
+import io.netty.handler.ssl.SslContext;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.Random;
+
+@SuppressWarnings({"WeakerAccess", "unused"})
+public final class MqttClientConfig {
+
+    private final SslContext sslContext;
+    private final String randomClientId;
+
+    private String clientId;
+    private int timeoutSeconds = 60;
+    private MqttVersion protocolVersion = MqttVersion.MQTT_3_1;
+    @Nullable private String username = null;
+    @Nullable private String password = null;
+    private boolean cleanSession = true;
+    @Nullable private MqttLastWill lastWill;
+    private Class<? extends Channel> channelClass = NioSocketChannel.class;
+
+    private boolean reconnect = true;
+
+    public MqttClientConfig() {
+        this(null);
+    }
+
+    public MqttClientConfig(SslContext sslContext) {
+        this.sslContext = sslContext;
+        Random random = new Random();
+        String id = "netty-mqtt/";
+        String[] options = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789".split("");
+        for(int i = 0; i < 8; i++){
+            id += options[random.nextInt(options.length)];
+        }
+        this.clientId = id;
+        this.randomClientId = id;
+    }
+
+    @Nonnull
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(@Nullable String clientId) {
+        if(clientId == null){
+            this.clientId = randomClientId;
+        }else{
+            this.clientId = clientId;
+        }
+    }
+
+    public int getTimeoutSeconds() {
+        return timeoutSeconds;
+    }
+
+    public void setTimeoutSeconds(int timeoutSeconds) {
+        if(timeoutSeconds != -1 && timeoutSeconds <= 0){
+            throw new IllegalArgumentException("timeoutSeconds must be > 0 or -1");
+        }
+        this.timeoutSeconds = timeoutSeconds;
+    }
+
+    public MqttVersion getProtocolVersion() {
+        return protocolVersion;
+    }
+
+    public void setProtocolVersion(MqttVersion protocolVersion) {
+        if(protocolVersion == null){
+            throw new NullPointerException("protocolVersion");
+        }
+        this.protocolVersion = protocolVersion;
+    }
+
+    @Nullable
+    public String getUsername() {
+        return username;
+    }
+
+    public void setUsername(@Nullable String username) {
+        this.username = username;
+    }
+
+    @Nullable
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(@Nullable String password) {
+        this.password = password;
+    }
+
+    public boolean isCleanSession() {
+        return cleanSession;
+    }
+
+    public void setCleanSession(boolean cleanSession) {
+        this.cleanSession = cleanSession;
+    }
+
+    @Nullable
+    public MqttLastWill getLastWill() {
+        return lastWill;
+    }
+
+    public void setLastWill(@Nullable MqttLastWill lastWill) {
+        this.lastWill = lastWill;
+    }
+
+    public Class<? extends Channel> getChannelClass() {
+        return channelClass;
+    }
+
+    public void setChannelClass(Class<? extends Channel> channelClass) {
+        this.channelClass = channelClass;
+    }
+
+    public SslContext getSslContext() {
+        return sslContext;
+    }
+
+    public boolean isReconnect() {
+        return reconnect;
+    }
+
+    public void setReconnect(boolean reconnect) {
+        this.reconnect = reconnect;
+    }
+}
diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java
new file mode 100644
index 0000000..3914105
--- /dev/null
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java
@@ -0,0 +1,484 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.mqtt;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.mqtt.*;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.Promise;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Represents an MqttClientImpl connected to a single MQTT server. Will try to keep the connection going at all times
+ */
+@SuppressWarnings({"WeakerAccess", "unused"})
+final class MqttClientImpl implements MqttClient {
+
+    private final Set<String> serverSubscribtions = new HashSet<>();
+    private final IntObjectHashMap<MqttPendingUnsubscribtion> pendingServerUnsubscribes = new IntObjectHashMap<>();
+    private final IntObjectHashMap<MqttIncomingQos2Publish> qos2PendingIncomingPublishes = new IntObjectHashMap<>();
+    private final IntObjectHashMap<MqttPendingPublish> pendingPublishes = new IntObjectHashMap<>();
+    private final HashMultimap<String, MqttSubscribtion> subscriptions = HashMultimap.create();
+    private final IntObjectHashMap<MqttPendingSubscribtion> pendingSubscribtions = new IntObjectHashMap<>();
+    private final Set<String> pendingSubscribeTopics = new HashSet<>();
+    private final HashMultimap<MqttHandler, MqttSubscribtion> handlerToSubscribtion = HashMultimap.create();
+    private final AtomicInteger nextMessageId = new AtomicInteger(1);
+
+    private final MqttClientConfig clientConfig;
+
+    private EventLoopGroup eventLoop;
+
+    private Channel channel;
+
+    private boolean disconnected = false;
+    private String host;
+    private int port;
+    private MqttClientCallback callback;
+
+
+    /**
+     * Construct the MqttClientImpl with default config
+     */
+    public MqttClientImpl() {
+        this.clientConfig = new MqttClientConfig();
+    }
+
+    /**
+     * Construct the MqttClientImpl with additional config.
+     * This config can also be changed using the {@link #getClientConfig()} function
+     *
+     * @param clientConfig The config object to use while looking for settings
+     */
+    public MqttClientImpl(MqttClientConfig clientConfig) {
+        this.clientConfig = clientConfig;
+    }
+
+    /**
+     * Connect to the specified hostname/ip. By default uses port 1883.
+     * If you want to change the port number, see {@link #connect(String, int)}
+     *
+     * @param host The ip address or host to connect to
+     * @return A future which will be completed when the connection is opened and we received an CONNACK
+     */
+    @Override
+    public Future<MqttConnectResult> connect(String host) {
+        return connect(host, 1883);
+    }
+
+    /**
+     * Connect to the specified hostname/ip using the specified port
+     *
+     * @param host The ip address or host to connect to
+     * @param port The tcp port to connect to
+     * @return A future which will be completed when the connection is opened and we received an CONNACK
+     */
+    @Override
+    public Future<MqttConnectResult> connect(String host, int port) {
+        if (this.eventLoop == null) {
+            this.eventLoop = new NioEventLoopGroup();
+        }
+        this.host = host;
+        this.port = port;
+
+        Promise<MqttConnectResult> connectFuture = new DefaultPromise<>(this.eventLoop.next());
+        Bootstrap bootstrap = new Bootstrap();
+        bootstrap.group(this.eventLoop);
+        bootstrap.channel(clientConfig.getChannelClass());
+        bootstrap.remoteAddress(host, port);
+        bootstrap.handler(new MqttChannelInitializer(connectFuture, host, port, clientConfig.getSslContext()));
+        ChannelFuture future = bootstrap.connect();
+        future.addListener((ChannelFutureListener) f -> {
+            if (f.isSuccess()) {
+                MqttClientImpl.this.channel = f.channel();
+            } else if (clientConfig.isReconnect() && !disconnected) {
+                eventLoop.schedule((Runnable) () -> connect(host, port), 1L, TimeUnit.SECONDS);
+            }
+        });
+        return connectFuture;
+    }
+
+    @Override
+    public boolean isConnected() {
+        if (!disconnected) {
+            return channel == null ? false : channel.isActive();
+        };
+        return false;
+    }
+
+    @Override
+    public Future<MqttConnectResult> reconnect() {
+        if (host == null) {
+            throw new IllegalStateException("Cannot reconnect. Call connect() first");
+        }
+        return connect(host, port);
+    }
+
+    /**
+     * Retrieve the netty {@link EventLoopGroup} we are using
+     *
+     * @return The netty {@link EventLoopGroup} we use for the connection
+     */
+    @Override
+    public EventLoopGroup getEventLoop() {
+        return eventLoop;
+    }
+
+    /**
+     * By default we use the netty {@link NioEventLoopGroup}.
+     * If you change the EventLoopGroup to another type, make sure to change the {@link Channel} class using {@link MqttClientConfig#setChannelClass(Class)}
+     * If you want to force the MqttClient to use another {@link EventLoopGroup}, call this function before calling {@link #connect(String, int)}
+     *
+     * @param eventLoop The new eventloop to use
+     */
+    @Override
+    public void setEventLoop(EventLoopGroup eventLoop) {
+        this.eventLoop = eventLoop;
+    }
+
+    /**
+     * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
+     *
+     * @param topic   The topic filter to subscribe to
+     * @param handler The handler to invoke when we receive a message
+     * @return A future which will be completed when the server acknowledges our subscribe request
+     */
+    @Override
+    public Future<Void> on(String topic, MqttHandler handler) {
+        return on(topic, handler, MqttQoS.AT_MOST_ONCE);
+    }
+
+    /**
+     * Subscribe on the given topic, with the given qos. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
+     *
+     * @param topic   The topic filter to subscribe to
+     * @param handler The handler to invoke when we receive a message
+     * @param qos     The qos to request to the server
+     * @return A future which will be completed when the server acknowledges our subscribe request
+     */
+    @Override
+    public Future<Void> on(String topic, MqttHandler handler, MqttQoS qos) {
+        return createSubscribtion(topic, handler, false, qos);
+    }
+
+    /**
+     * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
+     * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed
+     *
+     * @param topic   The topic filter to subscribe to
+     * @param handler The handler to invoke when we receive a message
+     * @return A future which will be completed when the server acknowledges our subscribe request
+     */
+    @Override
+    public Future<Void> once(String topic, MqttHandler handler) {
+        return once(topic, handler, MqttQoS.AT_MOST_ONCE);
+    }
+
+    /**
+     * Subscribe on the given topic, with the given qos. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
+     * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed
+     *
+     * @param topic   The topic filter to subscribe to
+     * @param handler The handler to invoke when we receive a message
+     * @param qos     The qos to request to the server
+     * @return A future which will be completed when the server acknowledges our subscribe request
+     */
+    @Override
+    public Future<Void> once(String topic, MqttHandler handler, MqttQoS qos) {
+        return createSubscribtion(topic, handler, true, qos);
+    }
+
+    /**
+     * Remove the subscribtion for the given topic and handler
+     * If you want to unsubscribe from all handlers known for this topic, use {@link #off(String)}
+     *
+     * @param topic   The topic to unsubscribe for
+     * @param handler The handler to unsubscribe
+     * @return A future which will be completed when the server acknowledges our unsubscribe request
+     */
+    @Override
+    public Future<Void> off(String topic, MqttHandler handler) {
+        Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
+        for (MqttSubscribtion subscribtion : this.handlerToSubscribtion.get(handler)) {
+            this.subscriptions.remove(topic, subscribtion);
+        }
+        this.handlerToSubscribtion.removeAll(handler);
+        this.checkSubscribtions(topic, future);
+        return future;
+    }
+
+    /**
+     * Remove all subscribtions for the given topic.
+     * If you want to specify which handler to unsubscribe, use {@link #off(String, MqttHandler)}
+     *
+     * @param topic The topic to unsubscribe for
+     * @return A future which will be completed when the server acknowledges our unsubscribe request
+     */
+    @Override
+    public Future<Void> off(String topic) {
+        Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
+        ImmutableSet<MqttSubscribtion> subscribtions = ImmutableSet.copyOf(this.subscriptions.get(topic));
+        for (MqttSubscribtion subscribtion : subscribtions) {
+            for (MqttSubscribtion handSub : this.handlerToSubscribtion.get(subscribtion.getHandler())) {
+                this.subscriptions.remove(topic, handSub);
+            }
+            this.handlerToSubscribtion.remove(subscribtion.getHandler(), subscribtion);
+        }
+        this.checkSubscribtions(topic, future);
+        return future;
+    }
+
+    /**
+     * Publish a message to the given payload
+     *
+     * @param topic   The topic to publish to
+     * @param payload The payload to send
+     * @return A future which will be completed when the message is sent out of the MqttClient
+     */
+    @Override
+    public Future<Void> publish(String topic, ByteBuf payload) {
+        return publish(topic, payload, MqttQoS.AT_MOST_ONCE, false);
+    }
+
+    /**
+     * Publish a message to the given payload, using the given qos
+     *
+     * @param topic   The topic to publish to
+     * @param payload The payload to send
+     * @param qos     The qos to use while publishing
+     * @return A future which will be completed when the message is delivered to the server
+     */
+    @Override
+    public Future<Void> publish(String topic, ByteBuf payload, MqttQoS qos) {
+        return publish(topic, payload, qos, false);
+    }
+
+    /**
+     * Publish a message to the given payload, using optional retain
+     *
+     * @param topic   The topic to publish to
+     * @param payload The payload to send
+     * @param retain  true if you want to retain the message on the server, false otherwise
+     * @return A future which will be completed when the message is sent out of the MqttClient
+     */
+    @Override
+    public Future<Void> publish(String topic, ByteBuf payload, boolean retain) {
+        return publish(topic, payload, MqttQoS.AT_MOST_ONCE, retain);
+    }
+
+    /**
+     * Publish a message to the given payload, using the given qos and optional retain
+     *
+     * @param topic   The topic to publish to
+     * @param payload The payload to send
+     * @param qos     The qos to use while publishing
+     * @param retain  true if you want to retain the message on the server, false otherwise
+     * @return A future which will be completed when the message is delivered to the server
+     */
+    @Override
+    public Future<Void> publish(String topic, ByteBuf payload, MqttQoS qos, boolean retain) {
+        Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
+        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retain, 0);
+        MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topic, getNewMessageId().messageId());
+        MqttPublishMessage message = new MqttPublishMessage(fixedHeader, variableHeader, payload);
+        MqttPendingPublish pendingPublish = new MqttPendingPublish(variableHeader.messageId(), future, payload.retain(), message, qos);
+        ChannelFuture channelFuture = this.sendAndFlushPacket(message);
+
+        if (channelFuture != null) {
+            pendingPublish.setSent(channelFuture != null);
+            if (channelFuture.cause() != null) {
+                future.setFailure(channelFuture.cause());
+                return future;
+            }
+        }
+        if (pendingPublish.isSent() && pendingPublish.getQos() == MqttQoS.AT_MOST_ONCE) {
+            pendingPublish.getFuture().setSuccess(null); //We don't get an ACK for QOS 0
+        } else if (pendingPublish.isSent()) {
+            this.pendingPublishes.put(pendingPublish.getMessageId(), pendingPublish);
+            pendingPublish.startPublishRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket);
+        }
+        return future;
+    }
+
+    /**
+     * Retrieve the MqttClient configuration
+     *
+     * @return The {@link MqttClientConfig} instance we use
+     */
+    @Override
+    public MqttClientConfig getClientConfig() {
+        return clientConfig;
+    }
+
+    @Override
+    public void disconnect() {
+        disconnected = true;
+        if (this.channel != null) {
+            MqttMessage message = new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0));
+            this.sendAndFlushPacket(message).addListener(future1 -> channel.close());
+        }
+    }
+
+    @Override
+    public void setCallback(MqttClientCallback callback) {
+        this.callback = callback;
+    }
+
+
+    ///////////////////////////////////////////// PRIVATE API /////////////////////////////////////////////
+
+    ChannelFuture sendAndFlushPacket(Object message) {
+        if (this.channel == null) {
+            return null;
+        }
+        if (this.channel.isActive()) {
+            return this.channel.writeAndFlush(message);
+        }
+        ChannelClosedException e = new ChannelClosedException("Channel is closed");
+        if (callback != null) {
+            callback.connectionLost(e);
+        }
+        return this.channel.newFailedFuture(e);
+    }
+
+    private MqttMessageIdVariableHeader getNewMessageId() {
+        this.nextMessageId.compareAndSet(0xffff, 1);
+        return MqttMessageIdVariableHeader.from(this.nextMessageId.getAndIncrement());
+    }
+
+    private Future<Void> createSubscribtion(String topic, MqttHandler handler, boolean once, MqttQoS qos) {
+        if (this.pendingSubscribeTopics.contains(topic)) {
+            Optional<Map.Entry<Integer, MqttPendingSubscribtion>> subscribtionEntry = this.pendingSubscribtions.entrySet().stream().filter((e) -> e.getValue().getTopic().equals(topic)).findAny();
+            if (subscribtionEntry.isPresent()) {
+                subscribtionEntry.get().getValue().addHandler(handler, once);
+                return subscribtionEntry.get().getValue().getFuture();
+            }
+        }
+        if (this.serverSubscribtions.contains(topic)) {
+            MqttSubscribtion subscribtion = new MqttSubscribtion(topic, handler, once);
+            this.subscriptions.put(topic, subscribtion);
+            this.handlerToSubscribtion.put(handler, subscribtion);
+            return this.channel.newSucceededFuture();
+        }
+
+        Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
+        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0);
+        MqttTopicSubscription subscription = new MqttTopicSubscription(topic, qos);
+        MqttMessageIdVariableHeader variableHeader = getNewMessageId();
+        MqttSubscribePayload payload = new MqttSubscribePayload(Collections.singletonList(subscription));
+        MqttSubscribeMessage message = new MqttSubscribeMessage(fixedHeader, variableHeader, payload);
+
+        final MqttPendingSubscribtion pendingSubscribtion = new MqttPendingSubscribtion(future, topic, message);
+        pendingSubscribtion.addHandler(handler, once);
+        this.pendingSubscribtions.put(variableHeader.messageId(), pendingSubscribtion);
+        this.pendingSubscribeTopics.add(topic);
+        pendingSubscribtion.setSent(this.sendAndFlushPacket(message) != null); //If not sent, we will send it when the connection is opened
+
+        pendingSubscribtion.startRetransmitTimer(this.eventLoop.next(), this::sendAndFlushPacket);
+
+        return future;
+    }
+
+    private void checkSubscribtions(String topic, Promise<Void> promise) {
+        if (!(this.subscriptions.containsKey(topic) && this.subscriptions.get(topic).size() != 0) && this.serverSubscribtions.contains(topic)) {
+            MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0);
+            MqttMessageIdVariableHeader variableHeader = getNewMessageId();
+            MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Collections.singletonList(topic));
+            MqttUnsubscribeMessage message = new MqttUnsubscribeMessage(fixedHeader, variableHeader, payload);
+
+            MqttPendingUnsubscribtion pendingUnsubscribtion = new MqttPendingUnsubscribtion(promise, topic, message);
+            this.pendingServerUnsubscribes.put(variableHeader.messageId(), pendingUnsubscribtion);
+            pendingUnsubscribtion.startRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket);
+
+            this.sendAndFlushPacket(message);
+        } else {
+            promise.setSuccess(null);
+        }
+    }
+
+    IntObjectHashMap<MqttPendingSubscribtion> getPendingSubscribtions() {
+        return pendingSubscribtions;
+    }
+
+    HashMultimap<String, MqttSubscribtion> getSubscriptions() {
+        return subscriptions;
+    }
+
+    Set<String> getPendingSubscribeTopics() {
+        return pendingSubscribeTopics;
+    }
+
+    HashMultimap<MqttHandler, MqttSubscribtion> getHandlerToSubscribtion() {
+        return handlerToSubscribtion;
+    }
+
+    Set<String> getServerSubscribtions() {
+        return serverSubscribtions;
+    }
+
+    IntObjectHashMap<MqttPendingUnsubscribtion> getPendingServerUnsubscribes() {
+        return pendingServerUnsubscribes;
+    }
+
+    IntObjectHashMap<MqttPendingPublish> getPendingPublishes() {
+        return pendingPublishes;
+    }
+
+    IntObjectHashMap<MqttIncomingQos2Publish> getQos2PendingIncomingPublishes() {
+        return qos2PendingIncomingPublishes;
+    }
+
+    private class MqttChannelInitializer extends ChannelInitializer<SocketChannel> {
+
+        private final Promise<MqttConnectResult> connectFuture;
+        private final String host;
+        private final int port;
+        private final SslContext sslContext;
+
+
+        public MqttChannelInitializer(Promise<MqttConnectResult> connectFuture, String host, int port, SslContext sslContext) {
+            this.connectFuture = connectFuture;
+            this.host = host;
+            this.port = port;
+            this.sslContext = sslContext;
+        }
+
+        @Override
+        protected void initChannel(SocketChannel ch) throws Exception {
+            if (sslContext != null) {
+                ch.pipeline().addLast(sslContext.newHandler(ch.alloc(), host, port));
+            }
+
+            ch.pipeline().addLast("mqttDecoder", new MqttDecoder());
+            ch.pipeline().addLast("mqttEncoder", MqttEncoder.INSTANCE);
+            ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(MqttClientImpl.this.clientConfig.getTimeoutSeconds(), MqttClientImpl.this.clientConfig.getTimeoutSeconds(), 0));
+            ch.pipeline().addLast("mqttPingHandler", new MqttPingHandler(MqttClientImpl.this.clientConfig.getTimeoutSeconds()));
+            ch.pipeline().addLast("mqttHandler", new MqttChannelHandler(MqttClientImpl.this, connectFuture));
+        }
+    }
+}
diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttConnectResult.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttConnectResult.java
new file mode 100644
index 0000000..5fa0e6d
--- /dev/null
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttConnectResult.java
@@ -0,0 +1,45 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.mqtt;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
+
+@SuppressWarnings({"WeakerAccess", "unused"})
+public final class MqttConnectResult {
+
+    private final boolean success;
+    private final MqttConnectReturnCode returnCode;
+    private final ChannelFuture closeFuture;
+
+    MqttConnectResult(boolean success, MqttConnectReturnCode returnCode, ChannelFuture closeFuture) {
+        this.success = success;
+        this.returnCode = returnCode;
+        this.closeFuture = closeFuture;
+    }
+
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public MqttConnectReturnCode getReturnCode() {
+        return returnCode;
+    }
+
+    public ChannelFuture getCloseFuture() {
+        return closeFuture;
+    }
+}
diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttHandler.java
new file mode 100644
index 0000000..4d6d58c
--- /dev/null
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttHandler.java
@@ -0,0 +1,23 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.mqtt;
+
+import io.netty.buffer.ByteBuf;
+
+public interface MqttHandler {
+
+    void onMessage(String topic, ByteBuf payload);
+}
diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttIncomingQos2Publish.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttIncomingQos2Publish.java
new file mode 100644
index 0000000..af84cde
--- /dev/null
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttIncomingQos2Publish.java
@@ -0,0 +1,48 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.mqtt;
+
+import io.netty.channel.EventLoop;
+import io.netty.handler.codec.mqtt.*;
+
+import java.util.function.Consumer;
+
+final class MqttIncomingQos2Publish {
+
+    private final MqttPublishMessage incomingPublish;
+
+    private final RetransmissionHandler<MqttMessage> retransmissionHandler = new RetransmissionHandler<>();
+
+    MqttIncomingQos2Publish(MqttPublishMessage incomingPublish, MqttMessage originalMessage) {
+        this.incomingPublish = incomingPublish;
+
+        this.retransmissionHandler.setOriginalMessage(originalMessage);
+    }
+
+    MqttPublishMessage getIncomingPublish() {
+        return incomingPublish;
+    }
+
+    void startPubrecRetransmitTimer(EventLoop eventLoop, Consumer<Object> sendPacket) {
+        this.retransmissionHandler.setHandle((fixedHeader, originalMessage) ->
+                sendPacket.accept(new MqttMessage(fixedHeader, originalMessage.variableHeader())));
+        this.retransmissionHandler.start(eventLoop);
+    }
+
+    void onPubrelReceived() {
+        this.retransmissionHandler.stop();
+    }
+}
diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttLastWill.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttLastWill.java
new file mode 100644
index 0000000..1dadfcd
--- /dev/null
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttLastWill.java
@@ -0,0 +1,154 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.mqtt;
+
+import io.netty.handler.codec.mqtt.MqttQoS;
+
+@SuppressWarnings({"WeakerAccess", "unused", "SimplifiableIfStatement", "StringBufferReplaceableByString"})
+public final class MqttLastWill {
+
+    private final String topic;
+    private final String message;
+    private final boolean retain;
+    private final MqttQoS qos;
+
+    public MqttLastWill(String topic, String message, boolean retain, MqttQoS qos) {
+        if(topic == null){
+            throw new NullPointerException("topic");
+        }
+        if(message == null){
+            throw new NullPointerException("message");
+        }
+        if(qos == null){
+            throw new NullPointerException("qos");
+        }
+        this.topic = topic;
+        this.message = message;
+        this.retain = retain;
+        this.qos = qos;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public boolean isRetain() {
+        return retain;
+    }
+
+    public MqttQoS getQos() {
+        return qos;
+    }
+
+    public static MqttLastWill.Builder builder(){
+        return new MqttLastWill.Builder();
+    }
+
+    public static final class Builder {
+
+        private String topic;
+        private String message;
+        private boolean retain;
+        private MqttQoS qos;
+
+        public String getTopic() {
+            return topic;
+        }
+
+        public Builder setTopic(String topic) {
+            if(topic == null){
+                throw new NullPointerException("topic");
+            }
+            this.topic = topic;
+            return this;
+        }
+
+        public String getMessage() {
+            return message;
+        }
+
+        public Builder setMessage(String message) {
+            if(message == null){
+                throw new NullPointerException("message");
+            }
+            this.message = message;
+            return this;
+        }
+
+        public boolean isRetain() {
+            return retain;
+        }
+
+        public Builder setRetain(boolean retain) {
+            this.retain = retain;
+            return this;
+        }
+
+        public MqttQoS getQos() {
+            return qos;
+        }
+
+        public Builder setQos(MqttQoS qos) {
+            if(qos == null){
+                throw new NullPointerException("qos");
+            }
+            this.qos = qos;
+            return this;
+        }
+
+        public MqttLastWill build(){
+            return new MqttLastWill(topic, message, retain, qos);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        MqttLastWill that = (MqttLastWill) o;
+
+        if (retain != that.retain) return false;
+        if (!topic.equals(that.topic)) return false;
+        if (!message.equals(that.message)) return false;
+        return qos == that.qos;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = topic.hashCode();
+        result = 31 * result + message.hashCode();
+        result = 31 * result + (retain ? 1 : 0);
+        result = 31 * result + qos.hashCode();
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder("MqttLastWill{");
+        sb.append("topic='").append(topic).append('\'');
+        sb.append(", message='").append(message).append('\'');
+        sb.append(", retain=").append(retain);
+        sb.append(", qos=").append(qos.name());
+        sb.append('}');
+        return sb.toString();
+    }
+}
diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingPublish.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingPublish.java
new file mode 100644
index 0000000..c656e84
--- /dev/null
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingPublish.java
@@ -0,0 +1,101 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.mqtt;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.EventLoop;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.netty.util.concurrent.Promise;
+
+import java.util.function.Consumer;
+
+final class MqttPendingPublish {
+
+    private final int messageId;
+    private final Promise<Void> future;
+    private final ByteBuf payload;
+    private final MqttPublishMessage message;
+    private final MqttQoS qos;
+
+    private final RetransmissionHandler<MqttPublishMessage> publishRetransmissionHandler = new RetransmissionHandler<>();
+    private final RetransmissionHandler<MqttMessage> pubrelRetransmissionHandler = new RetransmissionHandler<>();
+
+    private boolean sent = false;
+
+    MqttPendingPublish(int messageId, Promise<Void> future, ByteBuf payload, MqttPublishMessage message, MqttQoS qos) {
+        this.messageId = messageId;
+        this.future = future;
+        this.payload = payload;
+        this.message = message;
+        this.qos = qos;
+
+        this.publishRetransmissionHandler.setOriginalMessage(message);
+    }
+
+    int getMessageId() {
+        return messageId;
+    }
+
+    Promise<Void> getFuture() {
+        return future;
+    }
+
+    ByteBuf getPayload() {
+        return payload;
+    }
+
+    boolean isSent() {
+        return sent;
+    }
+
+    void setSent(boolean sent) {
+        this.sent = sent;
+    }
+
+    MqttPublishMessage getMessage() {
+        return message;
+    }
+
+    MqttQoS getQos() {
+        return qos;
+    }
+
+    void startPublishRetransmissionTimer(EventLoop eventLoop, Consumer<Object> sendPacket) {
+        this.publishRetransmissionHandler.setHandle(((fixedHeader, originalMessage) ->
+                sendPacket.accept(new MqttPublishMessage(fixedHeader, originalMessage.variableHeader(), this.payload.retain()))));
+        this.publishRetransmissionHandler.start(eventLoop);
+    }
+
+    void onPubackReceived() {
+        this.publishRetransmissionHandler.stop();
+    }
+
+    void setPubrelMessage(MqttMessage pubrelMessage) {
+        this.pubrelRetransmissionHandler.setOriginalMessage(pubrelMessage);
+    }
+
+    void startPubrelRetransmissionTimer(EventLoop eventLoop, Consumer<Object> sendPacket) {
+        this.pubrelRetransmissionHandler.setHandle((fixedHeader, originalMessage) ->
+                sendPacket.accept(new MqttMessage(fixedHeader, originalMessage.variableHeader())));
+        this.pubrelRetransmissionHandler.start(eventLoop);
+    }
+
+    void onPubcompReceived() {
+        this.pubrelRetransmissionHandler.stop();
+    }
+}
diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingSubscribtion.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingSubscribtion.java
new file mode 100644
index 0000000..782aef1
--- /dev/null
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingSubscribtion.java
@@ -0,0 +1,102 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.mqtt;
+
+import io.netty.channel.EventLoop;
+import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
+import io.netty.util.concurrent.Promise;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Consumer;
+
+final class MqttPendingSubscribtion {
+
+    private final Promise<Void> future;
+    private final String topic;
+    private final Set<MqttPendingHandler> handlers = new HashSet<>();
+    private final MqttSubscribeMessage subscribeMessage;
+
+    private final RetransmissionHandler<MqttSubscribeMessage> retransmissionHandler = new RetransmissionHandler<>();
+
+    private boolean sent = false;
+
+    MqttPendingSubscribtion(Promise<Void> future, String topic, MqttSubscribeMessage message) {
+        this.future = future;
+        this.topic = topic;
+        this.subscribeMessage = message;
+
+        this.retransmissionHandler.setOriginalMessage(message);
+    }
+
+    Promise<Void> getFuture() {
+        return future;
+    }
+
+    String getTopic() {
+        return topic;
+    }
+
+    boolean isSent() {
+        return sent;
+    }
+
+    void setSent(boolean sent) {
+        this.sent = sent;
+    }
+
+    MqttSubscribeMessage getSubscribeMessage() {
+        return subscribeMessage;
+    }
+
+    void addHandler(MqttHandler handler, boolean once){
+        this.handlers.add(new MqttPendingHandler(handler, once));
+    }
+
+    Set<MqttPendingHandler> getHandlers() {
+        return handlers;
+    }
+
+    void startRetransmitTimer(EventLoop eventLoop, Consumer<Object> sendPacket) {
+        if(this.sent){ //If the packet is sent, we can start the retransmit timer
+            this.retransmissionHandler.setHandle((fixedHeader, originalMessage) ->
+                    sendPacket.accept(new MqttSubscribeMessage(fixedHeader, originalMessage.variableHeader(), originalMessage.payload())));
+            this.retransmissionHandler.start(eventLoop);
+        }
+    }
+
+    void onSubackReceived(){
+        this.retransmissionHandler.stop();
+    }
+
+    final class MqttPendingHandler {
+        private final MqttHandler handler;
+        private final boolean once;
+
+        MqttPendingHandler(MqttHandler handler, boolean once) {
+            this.handler = handler;
+            this.once = once;
+        }
+
+        MqttHandler getHandler() {
+            return handler;
+        }
+
+        boolean isOnce() {
+            return once;
+        }
+    }
+}
diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingUnsubscribtion.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingUnsubscribtion.java
new file mode 100644
index 0000000..a626a81
--- /dev/null
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingUnsubscribtion.java
@@ -0,0 +1,55 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.mqtt;
+
+import io.netty.channel.EventLoop;
+import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
+import io.netty.util.concurrent.Promise;
+
+import java.util.function.Consumer;
+
+final class MqttPendingUnsubscribtion {
+
+    private final Promise<Void> future;
+    private final String topic;
+
+    private final RetransmissionHandler<MqttUnsubscribeMessage> retransmissionHandler = new RetransmissionHandler<>();
+
+    MqttPendingUnsubscribtion(Promise<Void> future, String topic, MqttUnsubscribeMessage unsubscribeMessage) {
+        this.future = future;
+        this.topic = topic;
+
+        this.retransmissionHandler.setOriginalMessage(unsubscribeMessage);
+    }
+
+    Promise<Void> getFuture() {
+        return future;
+    }
+
+    String getTopic() {
+        return topic;
+    }
+
+    void startRetransmissionTimer(EventLoop eventLoop, Consumer<Object> sendPacket) {
+        this.retransmissionHandler.setHandle((fixedHeader, originalMessage) ->
+                sendPacket.accept(new MqttUnsubscribeMessage(fixedHeader, originalMessage.variableHeader(), originalMessage.payload())));
+        this.retransmissionHandler.start(eventLoop);
+    }
+
+    void onUnsubackReceived(){
+        this.retransmissionHandler.stop();
+    }
+}
diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPingHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPingHandler.java
new file mode 100644
index 0000000..d0fd998
--- /dev/null
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPingHandler.java
@@ -0,0 +1,98 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.mqtt;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.ScheduledFuture;
+
+import java.util.concurrent.TimeUnit;
+
+final class MqttPingHandler extends ChannelInboundHandlerAdapter {
+
+    private final int keepaliveSeconds;
+
+    private ScheduledFuture<?> pingRespTimeout;
+
+    MqttPingHandler(int keepaliveSeconds) {
+        this.keepaliveSeconds = keepaliveSeconds;
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        if (!(msg instanceof MqttMessage)) {
+            ctx.fireChannelRead(msg);
+            return;
+        }
+        MqttMessage message = (MqttMessage) msg;
+        if(message.fixedHeader().messageType() == MqttMessageType.PINGREQ){
+            this.handlePingReq(ctx.channel());
+        } else if(message.fixedHeader().messageType() == MqttMessageType.PINGRESP){
+            this.handlePingResp();
+        }else{
+            ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
+        }
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        super.userEventTriggered(ctx, evt);
+
+        if(evt instanceof IdleStateEvent){
+            IdleStateEvent event = (IdleStateEvent) evt;
+            switch(event.state()){
+                case READER_IDLE:
+                    break;
+                case WRITER_IDLE:
+                    this.sendPingReq(ctx.channel());
+                    break;
+            }
+        }
+    }
+
+    private void sendPingReq(Channel channel){
+        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0);
+        channel.writeAndFlush(new MqttMessage(fixedHeader));
+
+        if(this.pingRespTimeout != null){
+            this.pingRespTimeout = channel.eventLoop().schedule(() -> {
+                MqttFixedHeader fixedHeader2 = new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
+                channel.writeAndFlush(new MqttMessage(fixedHeader2)).addListener(ChannelFutureListener.CLOSE);
+                //TODO: what do when the connection is closed ?
+            }, this.keepaliveSeconds, TimeUnit.SECONDS);
+        }
+    }
+
+    private void handlePingReq(Channel channel){
+        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
+        channel.writeAndFlush(new MqttMessage(fixedHeader));
+    }
+
+    private void handlePingResp(){
+        if(this.pingRespTimeout != null && !this.pingRespTimeout.isCancelled() && !this.pingRespTimeout.isDone()){
+            this.pingRespTimeout.cancel(true);
+            this.pingRespTimeout = null;
+        }
+    }
+}
diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscribtion.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscribtion.java
new file mode 100644
index 0000000..27f4cb9
--- /dev/null
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscribtion.java
@@ -0,0 +1,84 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.mqtt;
+
+import java.util.regex.Pattern;
+
+final class MqttSubscribtion {
+
+    private final String topic;
+    private final Pattern topicRegex;
+    private final MqttHandler handler;
+
+    private final boolean once;
+
+    private boolean called;
+
+    MqttSubscribtion(String topic, MqttHandler handler, boolean once) {
+        if(topic == null){
+            throw new NullPointerException("topic");
+        }
+        if(handler == null){
+            throw new NullPointerException("handler");
+        }
+        this.topic = topic;
+        this.handler = handler;
+        this.once = once;
+        this.topicRegex = Pattern.compile(topic.replace("+", "[^/]+").replace("#", ".+") + "$");
+    }
+
+    String getTopic() {
+        return topic;
+    }
+
+    public MqttHandler getHandler() {
+        return handler;
+    }
+
+    boolean isOnce() {
+        return once;
+    }
+
+    boolean isCalled() {
+        return called;
+    }
+
+    boolean matches(String topic){
+        return this.topicRegex.matcher(topic).matches();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        MqttSubscribtion that = (MqttSubscribtion) o;
+
+        return once == that.once && topic.equals(that.topic) && handler.equals(that.handler);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = topic.hashCode();
+        result = 31 * result + handler.hashCode();
+        result = 31 * result + (once ? 1 : 0);
+        return result;
+    }
+
+    void setCalled(boolean called) {
+        this.called = called;
+    }
+}
diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/RetransmissionHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/RetransmissionHandler.java
new file mode 100644
index 0000000..36e91e5
--- /dev/null
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/RetransmissionHandler.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.mqtt;
+
+import io.netty.channel.EventLoop;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.util.concurrent.ScheduledFuture;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+
+final class RetransmissionHandler<T extends MqttMessage> {
+
+    private ScheduledFuture<?> timer;
+    private int timeout = 10;
+    private BiConsumer<MqttFixedHeader, T> handler;
+    private T originalMessage;
+
+    void start(EventLoop eventLoop){
+        if(eventLoop == null){
+            throw new NullPointerException("eventLoop");
+        }
+        if(this.handler == null){
+            throw new NullPointerException("handler");
+        }
+        this.timeout = 10;
+        this.startTimer(eventLoop);
+    }
+
+    private void startTimer(EventLoop eventLoop){
+        this.timer = eventLoop.schedule(() -> {
+            this.timeout += 5;
+            MqttFixedHeader fixedHeader = new MqttFixedHeader(this.originalMessage.fixedHeader().messageType(), true, this.originalMessage.fixedHeader().qosLevel(), this.originalMessage.fixedHeader().isRetain(), this.originalMessage.fixedHeader().remainingLength());
+            handler.accept(fixedHeader, originalMessage);
+            startTimer(eventLoop);
+        }, timeout, TimeUnit.SECONDS);
+    }
+
+    void stop(){
+        if(this.timer != null){
+            this.timer.cancel(true);
+        }
+    }
+
+    void setHandle(BiConsumer<MqttFixedHeader, T> runnable) {
+        this.handler = runnable;
+    }
+
+    void setOriginalMessage(T originalMessage) {
+        this.originalMessage = originalMessage;
+    }
+}

pom.xml 17(+11 -6)

diff --git a/pom.xml b/pom.xml
index 67d48b2..297512f 100755
--- a/pom.xml
+++ b/pom.xml
@@ -79,7 +79,6 @@
         <dbunit.version>2.5.3</dbunit.version>
         <spring-test-dbunit.version>1.2.1</spring-test-dbunit.version>
         <postgresql.driver.version>9.4.1211</postgresql.driver.version>
-        <netty-mqtt-client.version>2.0.0TB</netty-mqtt-client.version>
         <sonar.exclusions>org/thingsboard/server/gen/**/*,
             org/thingsboard/server/extensions/core/plugin/telemetry/gen/**/*
         </sonar.exclusions>
@@ -87,6 +86,7 @@
     </properties>
 
     <modules>
+        <module>netty-mqtt</module>
         <module>common</module>
         <module>rule-engine</module>
         <module>dao</module>
@@ -326,6 +326,11 @@
         <dependencies>
             <dependency>
                 <groupId>org.thingsboard</groupId>
+                <artifactId>netty-mqtt</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.thingsboard</groupId>
                 <artifactId>extensions-api</artifactId>
                 <version>${project.version}</version>
             </dependency>
@@ -569,6 +574,11 @@
                 <version>${netty.version}</version>
             </dependency>
             <dependency>
+                <groupId>io.netty</groupId>
+                <artifactId>netty-codec-mqtt</artifactId>
+                <version>${netty.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>com.datastax.cassandra</groupId>
                 <artifactId>cassandra-driver-core</artifactId>
                 <version>${cassandra.version}</version>
@@ -820,11 +830,6 @@
                 <scope>provided</scope>
             </dependency>
             <dependency>
-                <groupId>nl.jk5.netty-mqtt</groupId>
-                <artifactId>netty-mqtt</artifactId>
-                <version>${netty-mqtt-client.version}</version>
-            </dependency>
-            <dependency>
                 <groupId>org.elasticsearch.client</groupId>
                 <artifactId>rest</artifactId>
                 <version>${elasticsearch.version}</version>
diff --git a/rule-engine/rule-engine-components/pom.xml b/rule-engine/rule-engine-components/pom.xml
index 3352f5a..0ef9f72 100644
--- a/rule-engine/rule-engine-components/pom.xml
+++ b/rule-engine/rule-engine-components/pom.xml
@@ -69,6 +69,10 @@
             <artifactId>rule-engine-api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.thingsboard</groupId>
+            <artifactId>netty-mqtt</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
@@ -91,10 +95,6 @@
             <artifactId>amqp-client</artifactId>
         </dependency>
         <dependency>
-            <groupId>nl.jk5.netty-mqtt</groupId>
-            <artifactId>netty-mqtt</artifactId>
-        </dependency>
-        <dependency>
             <groupId>org.bouncycastle</groupId>
             <artifactId>bcpkix-jdk15on</artifactId>
         </dependency>
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/AnonymousCredentials.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/AnonymousCredentials.java
index 9d5e8df..53aea96 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/AnonymousCredentials.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/AnonymousCredentials.java
@@ -17,7 +17,7 @@
 package org.thingsboard.rule.engine.mqtt.credentials;
 
 import io.netty.handler.ssl.SslContext;
-import nl.jk5.mqtt.MqttClientConfig;
+import org.thingsboard.mqtt.MqttClientConfig;
 
 import java.util.Optional;
 
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/BasicCredentials.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/BasicCredentials.java
index cbbd703..b3d86c6 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/BasicCredentials.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/BasicCredentials.java
@@ -18,7 +18,7 @@ package org.thingsboard.rule.engine.mqtt.credentials;
 
 import io.netty.handler.ssl.SslContext;
 import lombok.Data;
-import nl.jk5.mqtt.MqttClientConfig;
+import org.thingsboard.mqtt.MqttClientConfig;
 
 import java.util.Optional;
 
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/CertPemClientCredentials.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/CertPemClientCredentials.java
index c9fb4a3..a462839 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/CertPemClientCredentials.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/CertPemClientCredentials.java
@@ -22,7 +22,7 @@ import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
-import nl.jk5.mqtt.MqttClientConfig;
+import org.thingsboard.mqtt.MqttClientConfig;
 import org.apache.commons.codec.binary.Base64;
 import org.bouncycastle.jce.provider.BouncyCastleProvider;
 import org.bouncycastle.openssl.PEMDecryptorProvider;
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/MqttClientCredentials.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/MqttClientCredentials.java
index 5c4594f..0ab81a8 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/MqttClientCredentials.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/MqttClientCredentials.java
@@ -19,7 +19,7 @@ package org.thingsboard.rule.engine.mqtt.credentials;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import io.netty.handler.ssl.SslContext;
-import nl.jk5.mqtt.MqttClientConfig;
+import org.thingsboard.mqtt.MqttClientConfig;
 
 import java.util.Optional;
 
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java
index 9694b04..ce54a73 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java
@@ -24,9 +24,9 @@ import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.util.concurrent.Future;
 import lombok.extern.slf4j.Slf4j;
-import nl.jk5.mqtt.MqttClient;
-import nl.jk5.mqtt.MqttClientConfig;
-import nl.jk5.mqtt.MqttConnectResult;
+import org.thingsboard.mqtt.MqttClient;
+import org.thingsboard.mqtt.MqttClientConfig;
+import org.thingsboard.mqtt.MqttConnectResult;
 import org.springframework.util.StringUtils;
 import org.thingsboard.rule.engine.TbNodeUtils;
 import org.thingsboard.rule.engine.api.*;