diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java
index ef5e7a5..024e15a 100644
--- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java
@@ -98,33 +98,25 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
}
private void invokeHandlersForIncomingPublish(MqttPublishMessage message) {
- for (MqttSubscribtion subscribtion : ImmutableSet.copyOf(this.client.getSubscriptions().values())) {
- if (subscribtion.matches(message.variableHeader().topicName())) {
- if (subscribtion.isOnce() && subscribtion.isCalled()) {
+ boolean handlerInvoked = false;
+ for (MqttSubscription subscription : ImmutableSet.copyOf(this.client.getSubscriptions().values())) {
+ if (subscription.matches(message.variableHeader().topicName())) {
+ if (subscription.isOnce() && subscription.isCalled()) {
continue;
}
message.payload().markReaderIndex();
- subscribtion.setCalled(true);
- subscribtion.getHandler().onMessage(message.variableHeader().topicName(), message.payload());
- if (subscribtion.isOnce()) {
- this.client.off(subscribtion.getTopic(), subscribtion.getHandler());
+ subscription.setCalled(true);
+ subscription.getHandler().onMessage(message.variableHeader().topicName(), message.payload());
+ if (subscription.isOnce()) {
+ this.client.off(subscription.getTopic(), subscription.getHandler());
}
message.payload().resetReaderIndex();
+ handlerInvoked = true;
}
}
- /*Set<MqttSubscribtion> subscribtions = ImmutableSet.copyOf(this.client.getSubscriptions().get(message.variableHeader().topicName()));
- for (MqttSubscribtion subscribtion : subscribtions) {
- if(subscribtion.isOnce() && subscribtion.isCalled()){
- continue;
- }
- message.payload().markReaderIndex();
- subscribtion.setCalled(true);
- subscribtion.getHandler().onMessage(message.variableHeader().topicName(), message.payload());
- if(subscribtion.isOnce()){
- this.client.off(subscribtion.getTopic(), subscribtion.getHandler());
- }
- message.payload().resetReaderIndex();
- }*/
+ if (!handlerInvoked && client.getDefaultHandler() != null) {
+ client.getDefaultHandler().onMessage(message.variableHeader().topicName(), message.payload());
+ }
message.payload().release();
}
@@ -133,7 +125,7 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
case CONNECTION_ACCEPTED:
this.connectFuture.setSuccess(new MqttConnectResult(true, MqttConnectReturnCode.CONNECTION_ACCEPTED, channel.closeFuture()));
- this.client.getPendingSubscribtions().entrySet().stream().filter((e) -> !e.getValue().isSent()).forEach((e) -> {
+ this.client.getPendingSubscriptions().entrySet().stream().filter((e) -> !e.getValue().isSent()).forEach((e) -> {
channel.write(e.getValue().getSubscribeMessage());
e.getValue().setSent(true);
});
@@ -148,6 +140,9 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
}
});
channel.flush();
+ if (this.client.isReconnect()) {
+ this.client.onSuccessfulReconnect();
+ }
break;
case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD:
@@ -163,19 +158,19 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
}
private void handleSubAck(MqttSubAckMessage message) {
- MqttPendingSubscribtion pendingSubscription = this.client.getPendingSubscribtions().remove(message.variableHeader().messageId());
+ MqttPendingSubscribtion pendingSubscription = this.client.getPendingSubscriptions().remove(message.variableHeader().messageId());
if (pendingSubscription == null) {
return;
}
pendingSubscription.onSubackReceived();
for (MqttPendingSubscribtion.MqttPendingHandler handler : pendingSubscription.getHandlers()) {
- MqttSubscribtion subscribtion = new MqttSubscribtion(pendingSubscription.getTopic(), handler.getHandler(), handler.isOnce());
+ MqttSubscription subscribtion = new MqttSubscription(pendingSubscription.getTopic(), handler.getHandler(), handler.isOnce());
this.client.getSubscriptions().put(pendingSubscription.getTopic(), subscribtion);
this.client.getHandlerToSubscribtion().put(handler.getHandler(), subscribtion);
}
this.client.getPendingSubscribeTopics().remove(pendingSubscription.getTopic());
- this.client.getServerSubscribtions().add(pendingSubscription.getTopic());
+ this.client.getServerSubscriptions().add(pendingSubscription.getTopic());
if (!pendingSubscription.getFuture().isDone()) {
pendingSubscription.getFuture().setSuccess(null);
@@ -220,7 +215,7 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
return;
}
unsubscribtion.onUnsubackReceived();
- this.client.getServerSubscribtions().remove(unsubscribtion.getTopic());
+ this.client.getServerSubscriptions().remove(unsubscribtion.getTopic());
unsubscribtion.getFuture().setSuccess(null);
this.client.getPendingServerUnsubscribes().remove(message.variableHeader().messageId());
}
diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java
index 3914105..d43ce55 100644
--- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java
@@ -40,23 +40,26 @@ import java.util.concurrent.atomic.AtomicInteger;
@SuppressWarnings({"WeakerAccess", "unused"})
final class MqttClientImpl implements MqttClient {
- private final Set<String> serverSubscribtions = new HashSet<>();
+ private final Set<String> serverSubscriptions = new HashSet<>();
private final IntObjectHashMap<MqttPendingUnsubscribtion> pendingServerUnsubscribes = new IntObjectHashMap<>();
private final IntObjectHashMap<MqttIncomingQos2Publish> qos2PendingIncomingPublishes = new IntObjectHashMap<>();
private final IntObjectHashMap<MqttPendingPublish> pendingPublishes = new IntObjectHashMap<>();
- private final HashMultimap<String, MqttSubscribtion> subscriptions = HashMultimap.create();
- private final IntObjectHashMap<MqttPendingSubscribtion> pendingSubscribtions = new IntObjectHashMap<>();
+ private final HashMultimap<String, MqttSubscription> subscriptions = HashMultimap.create();
+ private final IntObjectHashMap<MqttPendingSubscribtion> pendingSubscriptions = new IntObjectHashMap<>();
private final Set<String> pendingSubscribeTopics = new HashSet<>();
- private final HashMultimap<MqttHandler, MqttSubscribtion> handlerToSubscribtion = HashMultimap.create();
+ private final HashMultimap<MqttHandler, MqttSubscription> handlerToSubscribtion = HashMultimap.create();
private final AtomicInteger nextMessageId = new AtomicInteger(1);
private final MqttClientConfig clientConfig;
+ private final MqttHandler defaultHandler;
+
private EventLoopGroup eventLoop;
- private Channel channel;
+ private volatile Channel channel;
- private boolean disconnected = false;
+ private volatile boolean disconnected = false;
+ private volatile boolean reconnect = false;
private String host;
private int port;
private MqttClientCallback callback;
@@ -65,8 +68,9 @@ final class MqttClientImpl implements MqttClient {
/**
* Construct the MqttClientImpl with default config
*/
- public MqttClientImpl() {
+ public MqttClientImpl(MqttHandler defaultHandler) {
this.clientConfig = new MqttClientConfig();
+ this.defaultHandler = defaultHandler;
}
/**
@@ -75,8 +79,9 @@ final class MqttClientImpl implements MqttClient {
*
* @param clientConfig The config object to use while looking for settings
*/
- public MqttClientImpl(MqttClientConfig clientConfig) {
+ public MqttClientImpl(MqttClientConfig clientConfig, MqttHandler defaultHandler) {
this.clientConfig = clientConfig;
+ this.defaultHandler = defaultHandler;
}
/**
@@ -100,12 +105,15 @@ final class MqttClientImpl implements MqttClient {
*/
@Override
public Future<MqttConnectResult> connect(String host, int port) {
+ return connect(host, port, false);
+ }
+
+ private Future<MqttConnectResult> connect(String host, int port, boolean reconnect) {
if (this.eventLoop == null) {
this.eventLoop = new NioEventLoopGroup();
}
this.host = host;
this.port = port;
-
Promise<MqttConnectResult> connectFuture = new DefaultPromise<>(this.eventLoop.next());
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(this.eventLoop);
@@ -113,22 +121,47 @@ final class MqttClientImpl implements MqttClient {
bootstrap.remoteAddress(host, port);
bootstrap.handler(new MqttChannelInitializer(connectFuture, host, port, clientConfig.getSslContext()));
ChannelFuture future = bootstrap.connect();
+
future.addListener((ChannelFutureListener) f -> {
if (f.isSuccess()) {
MqttClientImpl.this.channel = f.channel();
- } else if (clientConfig.isReconnect() && !disconnected) {
- eventLoop.schedule((Runnable) () -> connect(host, port), 1L, TimeUnit.SECONDS);
+ MqttClientImpl.this.channel.closeFuture().addListener((ChannelFutureListener) channelFuture -> {
+ if (isConnected()) {
+ return;
+ }
+ ChannelClosedException e = new ChannelClosedException("Channel is closed!");
+ if (callback != null) {
+ callback.connectionLost(e);
+ }
+ pendingSubscriptions.clear();
+ serverSubscriptions.clear();
+ subscriptions.clear();
+ pendingServerUnsubscribes.clear();
+ qos2PendingIncomingPublishes.clear();
+ pendingPublishes.clear();
+ pendingSubscribeTopics.clear();
+ handlerToSubscribtion.clear();
+ scheduleConnectIfRequired(host, port, true);
+ });
+ } else {
+ scheduleConnectIfRequired(host, port, reconnect);
}
});
return connectFuture;
}
+ private void scheduleConnectIfRequired(String host, int port, boolean reconnect) {
+ if (clientConfig.isReconnect() && !disconnected) {
+ if (reconnect) {
+ this.reconnect = true;
+ }
+ eventLoop.schedule((Runnable) () -> connect(host, port, reconnect), 1L, TimeUnit.SECONDS);
+ }
+ }
+
@Override
public boolean isConnected() {
- if (!disconnected) {
- return channel == null ? false : channel.isActive();
- };
- return false;
+ return !disconnected && channel != null && channel.isActive();
}
@Override
@@ -183,7 +216,7 @@ final class MqttClientImpl implements MqttClient {
*/
@Override
public Future<Void> on(String topic, MqttHandler handler, MqttQoS qos) {
- return createSubscribtion(topic, handler, false, qos);
+ return createSubscription(topic, handler, false, qos);
}
/**
@@ -210,7 +243,7 @@ final class MqttClientImpl implements MqttClient {
*/
@Override
public Future<Void> once(String topic, MqttHandler handler, MqttQoS qos) {
- return createSubscribtion(topic, handler, true, qos);
+ return createSubscription(topic, handler, true, qos);
}
/**
@@ -224,7 +257,7 @@ final class MqttClientImpl implements MqttClient {
@Override
public Future<Void> off(String topic, MqttHandler handler) {
Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
- for (MqttSubscribtion subscribtion : this.handlerToSubscribtion.get(handler)) {
+ for (MqttSubscription subscribtion : this.handlerToSubscribtion.get(handler)) {
this.subscriptions.remove(topic, subscribtion);
}
this.handlerToSubscribtion.removeAll(handler);
@@ -242,9 +275,9 @@ final class MqttClientImpl implements MqttClient {
@Override
public Future<Void> off(String topic) {
Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
- ImmutableSet<MqttSubscribtion> subscribtions = ImmutableSet.copyOf(this.subscriptions.get(topic));
- for (MqttSubscribtion subscribtion : subscribtions) {
- for (MqttSubscribtion handSub : this.handlerToSubscribtion.get(subscribtion.getHandler())) {
+ ImmutableSet<MqttSubscription> subscribtions = ImmutableSet.copyOf(this.subscriptions.get(topic));
+ for (MqttSubscription subscribtion : subscribtions) {
+ for (MqttSubscription handSub : this.handlerToSubscribtion.get(subscribtion.getHandler())) {
this.subscriptions.remove(topic, handSub);
}
this.handlerToSubscribtion.remove(subscribtion.getHandler(), subscribtion);
@@ -310,7 +343,7 @@ final class MqttClientImpl implements MqttClient {
ChannelFuture channelFuture = this.sendAndFlushPacket(message);
if (channelFuture != null) {
- pendingPublish.setSent(channelFuture != null);
+ pendingPublish.setSent(true);
if (channelFuture.cause() != null) {
future.setFailure(channelFuture.cause());
return future;
@@ -352,6 +385,15 @@ final class MqttClientImpl implements MqttClient {
///////////////////////////////////////////// PRIVATE API /////////////////////////////////////////////
+ public boolean isReconnect() {
+ return reconnect;
+ }
+
+ public void onSuccessfulReconnect() {
+ callback.onSuccessfulReconnect();
+ }
+
+
ChannelFuture sendAndFlushPacket(Object message) {
if (this.channel == null) {
return null;
@@ -359,11 +401,7 @@ final class MqttClientImpl implements MqttClient {
if (this.channel.isActive()) {
return this.channel.writeAndFlush(message);
}
- ChannelClosedException e = new ChannelClosedException("Channel is closed");
- if (callback != null) {
- callback.connectionLost(e);
- }
- return this.channel.newFailedFuture(e);
+ return this.channel.newFailedFuture(new ChannelClosedException("Channel is closed!"));
}
private MqttMessageIdVariableHeader getNewMessageId() {
@@ -371,16 +409,16 @@ final class MqttClientImpl implements MqttClient {
return MqttMessageIdVariableHeader.from(this.nextMessageId.getAndIncrement());
}
- private Future<Void> createSubscribtion(String topic, MqttHandler handler, boolean once, MqttQoS qos) {
+ private Future<Void> createSubscription(String topic, MqttHandler handler, boolean once, MqttQoS qos) {
if (this.pendingSubscribeTopics.contains(topic)) {
- Optional<Map.Entry<Integer, MqttPendingSubscribtion>> subscribtionEntry = this.pendingSubscribtions.entrySet().stream().filter((e) -> e.getValue().getTopic().equals(topic)).findAny();
+ Optional<Map.Entry<Integer, MqttPendingSubscribtion>> subscribtionEntry = this.pendingSubscriptions.entrySet().stream().filter((e) -> e.getValue().getTopic().equals(topic)).findAny();
if (subscribtionEntry.isPresent()) {
subscribtionEntry.get().getValue().addHandler(handler, once);
return subscribtionEntry.get().getValue().getFuture();
}
}
- if (this.serverSubscribtions.contains(topic)) {
- MqttSubscribtion subscribtion = new MqttSubscribtion(topic, handler, once);
+ if (this.serverSubscriptions.contains(topic)) {
+ MqttSubscription subscribtion = new MqttSubscription(topic, handler, once);
this.subscriptions.put(topic, subscribtion);
this.handlerToSubscribtion.put(handler, subscribtion);
return this.channel.newSucceededFuture();
@@ -395,7 +433,7 @@ final class MqttClientImpl implements MqttClient {
final MqttPendingSubscribtion pendingSubscribtion = new MqttPendingSubscribtion(future, topic, message);
pendingSubscribtion.addHandler(handler, once);
- this.pendingSubscribtions.put(variableHeader.messageId(), pendingSubscribtion);
+ this.pendingSubscriptions.put(variableHeader.messageId(), pendingSubscribtion);
this.pendingSubscribeTopics.add(topic);
pendingSubscribtion.setSent(this.sendAndFlushPacket(message) != null); //If not sent, we will send it when the connection is opened
@@ -405,7 +443,7 @@ final class MqttClientImpl implements MqttClient {
}
private void checkSubscribtions(String topic, Promise<Void> promise) {
- if (!(this.subscriptions.containsKey(topic) && this.subscriptions.get(topic).size() != 0) && this.serverSubscribtions.contains(topic)) {
+ if (!(this.subscriptions.containsKey(topic) && this.subscriptions.get(topic).size() != 0) && this.serverSubscriptions.contains(topic)) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader = getNewMessageId();
MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Collections.singletonList(topic));
@@ -421,11 +459,11 @@ final class MqttClientImpl implements MqttClient {
}
}
- IntObjectHashMap<MqttPendingSubscribtion> getPendingSubscribtions() {
- return pendingSubscribtions;
+ IntObjectHashMap<MqttPendingSubscribtion> getPendingSubscriptions() {
+ return pendingSubscriptions;
}
- HashMultimap<String, MqttSubscribtion> getSubscriptions() {
+ HashMultimap<String, MqttSubscription> getSubscriptions() {
return subscriptions;
}
@@ -433,12 +471,12 @@ final class MqttClientImpl implements MqttClient {
return pendingSubscribeTopics;
}
- HashMultimap<MqttHandler, MqttSubscribtion> getHandlerToSubscribtion() {
+ HashMultimap<MqttHandler, MqttSubscription> getHandlerToSubscribtion() {
return handlerToSubscribtion;
}
- Set<String> getServerSubscribtions() {
- return serverSubscribtions;
+ Set<String> getServerSubscriptions() {
+ return serverSubscriptions;
}
IntObjectHashMap<MqttPendingUnsubscribtion> getPendingServerUnsubscribes() {
@@ -481,4 +519,9 @@ final class MqttClientImpl implements MqttClient {
ch.pipeline().addLast("mqttHandler", new MqttChannelHandler(MqttClientImpl.this, connectFuture));
}
}
+
+ MqttHandler getDefaultHandler() {
+ return defaultHandler;
+ }
+
}