diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java
index 024e15a..c927c42 100644
--- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java
@@ -158,15 +158,15 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
}
private void handleSubAck(MqttSubAckMessage message) {
- MqttPendingSubscribtion pendingSubscription = this.client.getPendingSubscriptions().remove(message.variableHeader().messageId());
+ MqttPendingSubscription pendingSubscription = this.client.getPendingSubscriptions().remove(message.variableHeader().messageId());
if (pendingSubscription == null) {
return;
}
pendingSubscription.onSubackReceived();
- for (MqttPendingSubscribtion.MqttPendingHandler handler : pendingSubscription.getHandlers()) {
- MqttSubscription subscribtion = new MqttSubscription(pendingSubscription.getTopic(), handler.getHandler(), handler.isOnce());
- this.client.getSubscriptions().put(pendingSubscription.getTopic(), subscribtion);
- this.client.getHandlerToSubscribtion().put(handler.getHandler(), subscribtion);
+ for (MqttPendingSubscription.MqttPendingHandler handler : pendingSubscription.getHandlers()) {
+ MqttSubscription subscription = new MqttSubscription(pendingSubscription.getTopic(), handler.getHandler(), handler.isOnce());
+ this.client.getSubscriptions().put(pendingSubscription.getTopic(), subscription);
+ this.client.getHandlerToSubscribtion().put(handler.getHandler(), subscription);
}
this.client.getPendingSubscribeTopics().remove(pendingSubscription.getTopic());
@@ -210,13 +210,13 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
}
private void handleUnsuback(MqttUnsubAckMessage message) {
- MqttPendingUnsubscribtion unsubscribtion = this.client.getPendingServerUnsubscribes().get(message.variableHeader().messageId());
- if (unsubscribtion == null) {
+ MqttPendingUnsubscription unsubscription = this.client.getPendingServerUnsubscribes().get(message.variableHeader().messageId());
+ if (unsubscription == null) {
return;
}
- unsubscribtion.onUnsubackReceived();
- this.client.getServerSubscriptions().remove(unsubscribtion.getTopic());
- unsubscribtion.getFuture().setSuccess(null);
+ unsubscription.onUnsubackReceived();
+ this.client.getServerSubscriptions().remove(unsubscription.getTopic());
+ unsubscription.getFuture().setSuccess(null);
this.client.getPendingServerUnsubscribes().remove(message.variableHeader().messageId());
}
diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java
index 7e57a0a..37efca3 100644
--- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java
@@ -92,7 +92,7 @@ public interface MqttClient {
/**
* Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
- * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed
+ * This subscription is only once. If the MqttClient has received 1 message, the subscription will be removed
*
* @param topic The topic filter to subscribe to
* @param handler The handler to invoke when we receive a message
@@ -102,7 +102,7 @@ public interface MqttClient {
/**
* Subscribe on the given topic, with the given qos. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
- * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed
+ * This subscription is only once. If the MqttClient has received 1 message, the subscription will be removed
*
* @param topic The topic filter to subscribe to
* @param handler The handler to invoke when we receive a message
@@ -112,7 +112,7 @@ public interface MqttClient {
Future<Void> once(String topic, MqttHandler handler, MqttQoS qos);
/**
- * Remove the subscribtion for the given topic and handler
+ * Remove the subscription for the given topic and handler
* If you want to unsubscribe from all handlers known for this topic, use {@link #off(String)}
*
* @param topic The topic to unsubscribe for
@@ -122,7 +122,7 @@ public interface MqttClient {
Future<Void> off(String topic, MqttHandler handler);
/**
- * Remove all subscribtions for the given topic.
+ * Remove all subscriptions for the given topic.
* If you want to specify which handler to unsubscribe, use {@link #off(String, MqttHandler)}
*
* @param topic The topic to unsubscribe for
diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java
index d43ce55..a5df846 100644
--- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java
@@ -41,11 +41,11 @@ import java.util.concurrent.atomic.AtomicInteger;
final class MqttClientImpl implements MqttClient {
private final Set<String> serverSubscriptions = new HashSet<>();
- private final IntObjectHashMap<MqttPendingUnsubscribtion> pendingServerUnsubscribes = new IntObjectHashMap<>();
+ private final IntObjectHashMap<MqttPendingUnsubscription> pendingServerUnsubscribes = new IntObjectHashMap<>();
private final IntObjectHashMap<MqttIncomingQos2Publish> qos2PendingIncomingPublishes = new IntObjectHashMap<>();
private final IntObjectHashMap<MqttPendingPublish> pendingPublishes = new IntObjectHashMap<>();
private final HashMultimap<String, MqttSubscription> subscriptions = HashMultimap.create();
- private final IntObjectHashMap<MqttPendingSubscribtion> pendingSubscriptions = new IntObjectHashMap<>();
+ private final IntObjectHashMap<MqttPendingSubscription> pendingSubscriptions = new IntObjectHashMap<>();
private final Set<String> pendingSubscribeTopics = new HashSet<>();
private final HashMultimap<MqttHandler, MqttSubscription> handlerToSubscribtion = HashMultimap.create();
private final AtomicInteger nextMessageId = new AtomicInteger(1);
@@ -221,7 +221,7 @@ final class MqttClientImpl implements MqttClient {
/**
* Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
- * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed
+ * This subscription is only once. If the MqttClient has received 1 message, the subscription will be removed
*
* @param topic The topic filter to subscribe to
* @param handler The handler to invoke when we receive a message
@@ -234,7 +234,7 @@ final class MqttClientImpl implements MqttClient {
/**
* Subscribe on the given topic, with the given qos. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
- * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed
+ * This subscription is only once. If the MqttClient has received 1 message, the subscription will be removed
*
* @param topic The topic filter to subscribe to
* @param handler The handler to invoke when we receive a message
@@ -247,7 +247,7 @@ final class MqttClientImpl implements MqttClient {
}
/**
- * Remove the subscribtion for the given topic and handler
+ * Remove the subscription for the given topic and handler
* If you want to unsubscribe from all handlers known for this topic, use {@link #off(String)}
*
* @param topic The topic to unsubscribe for
@@ -257,8 +257,8 @@ final class MqttClientImpl implements MqttClient {
@Override
public Future<Void> off(String topic, MqttHandler handler) {
Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
- for (MqttSubscription subscribtion : this.handlerToSubscribtion.get(handler)) {
- this.subscriptions.remove(topic, subscribtion);
+ for (MqttSubscription subscription : this.handlerToSubscribtion.get(handler)) {
+ this.subscriptions.remove(topic, subscription);
}
this.handlerToSubscribtion.removeAll(handler);
this.checkSubscribtions(topic, future);
@@ -266,7 +266,7 @@ final class MqttClientImpl implements MqttClient {
}
/**
- * Remove all subscribtions for the given topic.
+ * Remove all subscriptions for the given topic.
* If you want to specify which handler to unsubscribe, use {@link #off(String, MqttHandler)}
*
* @param topic The topic to unsubscribe for
@@ -275,12 +275,12 @@ final class MqttClientImpl implements MqttClient {
@Override
public Future<Void> off(String topic) {
Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
- ImmutableSet<MqttSubscription> subscribtions = ImmutableSet.copyOf(this.subscriptions.get(topic));
- for (MqttSubscription subscribtion : subscribtions) {
- for (MqttSubscription handSub : this.handlerToSubscribtion.get(subscribtion.getHandler())) {
+ ImmutableSet<MqttSubscription> subscriptions = ImmutableSet.copyOf(this.subscriptions.get(topic));
+ for (MqttSubscription subscription : subscriptions) {
+ for (MqttSubscription handSub : this.handlerToSubscribtion.get(subscription.getHandler())) {
this.subscriptions.remove(topic, handSub);
}
- this.handlerToSubscribtion.remove(subscribtion.getHandler(), subscribtion);
+ this.handlerToSubscribtion.remove(subscription.getHandler(), subscription);
}
this.checkSubscribtions(topic, future);
return future;
@@ -411,16 +411,16 @@ final class MqttClientImpl implements MqttClient {
private Future<Void> createSubscription(String topic, MqttHandler handler, boolean once, MqttQoS qos) {
if (this.pendingSubscribeTopics.contains(topic)) {
- Optional<Map.Entry<Integer, MqttPendingSubscribtion>> subscribtionEntry = this.pendingSubscriptions.entrySet().stream().filter((e) -> e.getValue().getTopic().equals(topic)).findAny();
- if (subscribtionEntry.isPresent()) {
- subscribtionEntry.get().getValue().addHandler(handler, once);
- return subscribtionEntry.get().getValue().getFuture();
+ Optional<Map.Entry<Integer, MqttPendingSubscription>> subscriptionEntry = this.pendingSubscriptions.entrySet().stream().filter((e) -> e.getValue().getTopic().equals(topic)).findAny();
+ if (subscriptionEntry.isPresent()) {
+ subscriptionEntry.get().getValue().addHandler(handler, once);
+ return subscriptionEntry.get().getValue().getFuture();
}
}
if (this.serverSubscriptions.contains(topic)) {
- MqttSubscription subscribtion = new MqttSubscription(topic, handler, once);
- this.subscriptions.put(topic, subscribtion);
- this.handlerToSubscribtion.put(handler, subscribtion);
+ MqttSubscription subscription = new MqttSubscription(topic, handler, once);
+ this.subscriptions.put(topic, subscription);
+ this.handlerToSubscribtion.put(handler, subscription);
return this.channel.newSucceededFuture();
}
@@ -431,13 +431,13 @@ final class MqttClientImpl implements MqttClient {
MqttSubscribePayload payload = new MqttSubscribePayload(Collections.singletonList(subscription));
MqttSubscribeMessage message = new MqttSubscribeMessage(fixedHeader, variableHeader, payload);
- final MqttPendingSubscribtion pendingSubscribtion = new MqttPendingSubscribtion(future, topic, message);
- pendingSubscribtion.addHandler(handler, once);
- this.pendingSubscriptions.put(variableHeader.messageId(), pendingSubscribtion);
+ final MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(future, topic, message);
+ pendingSubscription.addHandler(handler, once);
+ this.pendingSubscriptions.put(variableHeader.messageId(), pendingSubscription);
this.pendingSubscribeTopics.add(topic);
- pendingSubscribtion.setSent(this.sendAndFlushPacket(message) != null); //If not sent, we will send it when the connection is opened
+ pendingSubscription.setSent(this.sendAndFlushPacket(message) != null); //If not sent, we will send it when the connection is opened
- pendingSubscribtion.startRetransmitTimer(this.eventLoop.next(), this::sendAndFlushPacket);
+ pendingSubscription.startRetransmitTimer(this.eventLoop.next(), this::sendAndFlushPacket);
return future;
}
@@ -449,9 +449,9 @@ final class MqttClientImpl implements MqttClient {
MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Collections.singletonList(topic));
MqttUnsubscribeMessage message = new MqttUnsubscribeMessage(fixedHeader, variableHeader, payload);
- MqttPendingUnsubscribtion pendingUnsubscribtion = new MqttPendingUnsubscribtion(promise, topic, message);
- this.pendingServerUnsubscribes.put(variableHeader.messageId(), pendingUnsubscribtion);
- pendingUnsubscribtion.startRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket);
+ MqttPendingUnsubscription pendingUnsubscription = new MqttPendingUnsubscription(promise, topic, message);
+ this.pendingServerUnsubscribes.put(variableHeader.messageId(), pendingUnsubscription);
+ pendingUnsubscription.startRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket);
this.sendAndFlushPacket(message);
} else {
@@ -459,7 +459,7 @@ final class MqttClientImpl implements MqttClient {
}
}
- IntObjectHashMap<MqttPendingSubscribtion> getPendingSubscriptions() {
+ IntObjectHashMap<MqttPendingSubscription> getPendingSubscriptions() {
return pendingSubscriptions;
}
@@ -479,7 +479,7 @@ final class MqttClientImpl implements MqttClient {
return serverSubscriptions;
}
- IntObjectHashMap<MqttPendingUnsubscribtion> getPendingServerUnsubscribes() {
+ IntObjectHashMap<MqttPendingUnsubscription> getPendingServerUnsubscribes() {
return pendingServerUnsubscribes;
}