thingsboard-aplcache

Details

diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java
index 024e15a..c927c42 100644
--- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java
@@ -158,15 +158,15 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage> 
     }
 
     private void handleSubAck(MqttSubAckMessage message) {
-        MqttPendingSubscribtion pendingSubscription = this.client.getPendingSubscriptions().remove(message.variableHeader().messageId());
+        MqttPendingSubscription pendingSubscription = this.client.getPendingSubscriptions().remove(message.variableHeader().messageId());
         if (pendingSubscription == null) {
             return;
         }
         pendingSubscription.onSubackReceived();
-        for (MqttPendingSubscribtion.MqttPendingHandler handler : pendingSubscription.getHandlers()) {
-            MqttSubscription subscribtion = new MqttSubscription(pendingSubscription.getTopic(), handler.getHandler(), handler.isOnce());
-            this.client.getSubscriptions().put(pendingSubscription.getTopic(), subscribtion);
-            this.client.getHandlerToSubscribtion().put(handler.getHandler(), subscribtion);
+        for (MqttPendingSubscription.MqttPendingHandler handler : pendingSubscription.getHandlers()) {
+            MqttSubscription subscription = new MqttSubscription(pendingSubscription.getTopic(), handler.getHandler(), handler.isOnce());
+            this.client.getSubscriptions().put(pendingSubscription.getTopic(), subscription);
+            this.client.getHandlerToSubscribtion().put(handler.getHandler(), subscription);
         }
         this.client.getPendingSubscribeTopics().remove(pendingSubscription.getTopic());
 
@@ -210,13 +210,13 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage> 
     }
 
     private void handleUnsuback(MqttUnsubAckMessage message) {
-        MqttPendingUnsubscribtion unsubscribtion = this.client.getPendingServerUnsubscribes().get(message.variableHeader().messageId());
-        if (unsubscribtion == null) {
+        MqttPendingUnsubscription unsubscription = this.client.getPendingServerUnsubscribes().get(message.variableHeader().messageId());
+        if (unsubscription == null) {
             return;
         }
-        unsubscribtion.onUnsubackReceived();
-        this.client.getServerSubscriptions().remove(unsubscribtion.getTopic());
-        unsubscribtion.getFuture().setSuccess(null);
+        unsubscription.onUnsubackReceived();
+        this.client.getServerSubscriptions().remove(unsubscription.getTopic());
+        unsubscription.getFuture().setSuccess(null);
         this.client.getPendingServerUnsubscribes().remove(message.variableHeader().messageId());
     }
 
diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java
index 7e57a0a..37efca3 100644
--- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClient.java
@@ -92,7 +92,7 @@ public interface MqttClient {
 
     /**
      * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
-     * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed
+     * This subscription is only once. If the MqttClient has received 1 message, the subscription will be removed
      *
      * @param topic The topic filter to subscribe to
      * @param handler The handler to invoke when we receive a message
@@ -102,7 +102,7 @@ public interface MqttClient {
 
     /**
      * Subscribe on the given topic, with the given qos. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
-     * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed
+     * This subscription is only once. If the MqttClient has received 1 message, the subscription will be removed
      *
      * @param topic The topic filter to subscribe to
      * @param handler The handler to invoke when we receive a message
@@ -112,7 +112,7 @@ public interface MqttClient {
     Future<Void> once(String topic, MqttHandler handler, MqttQoS qos);
 
     /**
-     * Remove the subscribtion for the given topic and handler
+     * Remove the subscription for the given topic and handler
      * If you want to unsubscribe from all handlers known for this topic, use {@link #off(String)}
      *
      * @param topic The topic to unsubscribe for
@@ -122,7 +122,7 @@ public interface MqttClient {
     Future<Void> off(String topic, MqttHandler handler);
 
     /**
-     * Remove all subscribtions for the given topic.
+     * Remove all subscriptions for the given topic.
      * If you want to specify which handler to unsubscribe, use {@link #off(String, MqttHandler)}
      *
      * @param topic The topic to unsubscribe for
diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java
index d43ce55..a5df846 100644
--- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java
+++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java
@@ -41,11 +41,11 @@ import java.util.concurrent.atomic.AtomicInteger;
 final class MqttClientImpl implements MqttClient {
 
     private final Set<String> serverSubscriptions = new HashSet<>();
-    private final IntObjectHashMap<MqttPendingUnsubscribtion> pendingServerUnsubscribes = new IntObjectHashMap<>();
+    private final IntObjectHashMap<MqttPendingUnsubscription> pendingServerUnsubscribes = new IntObjectHashMap<>();
     private final IntObjectHashMap<MqttIncomingQos2Publish> qos2PendingIncomingPublishes = new IntObjectHashMap<>();
     private final IntObjectHashMap<MqttPendingPublish> pendingPublishes = new IntObjectHashMap<>();
     private final HashMultimap<String, MqttSubscription> subscriptions = HashMultimap.create();
-    private final IntObjectHashMap<MqttPendingSubscribtion> pendingSubscriptions = new IntObjectHashMap<>();
+    private final IntObjectHashMap<MqttPendingSubscription> pendingSubscriptions = new IntObjectHashMap<>();
     private final Set<String> pendingSubscribeTopics = new HashSet<>();
     private final HashMultimap<MqttHandler, MqttSubscription> handlerToSubscribtion = HashMultimap.create();
     private final AtomicInteger nextMessageId = new AtomicInteger(1);
@@ -221,7 +221,7 @@ final class MqttClientImpl implements MqttClient {
 
     /**
      * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
-     * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed
+     * This subscription is only once. If the MqttClient has received 1 message, the subscription will be removed
      *
      * @param topic   The topic filter to subscribe to
      * @param handler The handler to invoke when we receive a message
@@ -234,7 +234,7 @@ final class MqttClientImpl implements MqttClient {
 
     /**
      * Subscribe on the given topic, with the given qos. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
-     * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed
+     * This subscription is only once. If the MqttClient has received 1 message, the subscription will be removed
      *
      * @param topic   The topic filter to subscribe to
      * @param handler The handler to invoke when we receive a message
@@ -247,7 +247,7 @@ final class MqttClientImpl implements MqttClient {
     }
 
     /**
-     * Remove the subscribtion for the given topic and handler
+     * Remove the subscription for the given topic and handler
      * If you want to unsubscribe from all handlers known for this topic, use {@link #off(String)}
      *
      * @param topic   The topic to unsubscribe for
@@ -257,8 +257,8 @@ final class MqttClientImpl implements MqttClient {
     @Override
     public Future<Void> off(String topic, MqttHandler handler) {
         Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
-        for (MqttSubscription subscribtion : this.handlerToSubscribtion.get(handler)) {
-            this.subscriptions.remove(topic, subscribtion);
+        for (MqttSubscription subscription : this.handlerToSubscribtion.get(handler)) {
+            this.subscriptions.remove(topic, subscription);
         }
         this.handlerToSubscribtion.removeAll(handler);
         this.checkSubscribtions(topic, future);
@@ -266,7 +266,7 @@ final class MqttClientImpl implements MqttClient {
     }
 
     /**
-     * Remove all subscribtions for the given topic.
+     * Remove all subscriptions for the given topic.
      * If you want to specify which handler to unsubscribe, use {@link #off(String, MqttHandler)}
      *
      * @param topic The topic to unsubscribe for
@@ -275,12 +275,12 @@ final class MqttClientImpl implements MqttClient {
     @Override
     public Future<Void> off(String topic) {
         Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
-        ImmutableSet<MqttSubscription> subscribtions = ImmutableSet.copyOf(this.subscriptions.get(topic));
-        for (MqttSubscription subscribtion : subscribtions) {
-            for (MqttSubscription handSub : this.handlerToSubscribtion.get(subscribtion.getHandler())) {
+        ImmutableSet<MqttSubscription> subscriptions = ImmutableSet.copyOf(this.subscriptions.get(topic));
+        for (MqttSubscription subscription : subscriptions) {
+            for (MqttSubscription handSub : this.handlerToSubscribtion.get(subscription.getHandler())) {
                 this.subscriptions.remove(topic, handSub);
             }
-            this.handlerToSubscribtion.remove(subscribtion.getHandler(), subscribtion);
+            this.handlerToSubscribtion.remove(subscription.getHandler(), subscription);
         }
         this.checkSubscribtions(topic, future);
         return future;
@@ -411,16 +411,16 @@ final class MqttClientImpl implements MqttClient {
 
     private Future<Void> createSubscription(String topic, MqttHandler handler, boolean once, MqttQoS qos) {
         if (this.pendingSubscribeTopics.contains(topic)) {
-            Optional<Map.Entry<Integer, MqttPendingSubscribtion>> subscribtionEntry = this.pendingSubscriptions.entrySet().stream().filter((e) -> e.getValue().getTopic().equals(topic)).findAny();
-            if (subscribtionEntry.isPresent()) {
-                subscribtionEntry.get().getValue().addHandler(handler, once);
-                return subscribtionEntry.get().getValue().getFuture();
+            Optional<Map.Entry<Integer, MqttPendingSubscription>> subscriptionEntry = this.pendingSubscriptions.entrySet().stream().filter((e) -> e.getValue().getTopic().equals(topic)).findAny();
+            if (subscriptionEntry.isPresent()) {
+                subscriptionEntry.get().getValue().addHandler(handler, once);
+                return subscriptionEntry.get().getValue().getFuture();
             }
         }
         if (this.serverSubscriptions.contains(topic)) {
-            MqttSubscription subscribtion = new MqttSubscription(topic, handler, once);
-            this.subscriptions.put(topic, subscribtion);
-            this.handlerToSubscribtion.put(handler, subscribtion);
+            MqttSubscription subscription = new MqttSubscription(topic, handler, once);
+            this.subscriptions.put(topic, subscription);
+            this.handlerToSubscribtion.put(handler, subscription);
             return this.channel.newSucceededFuture();
         }
 
@@ -431,13 +431,13 @@ final class MqttClientImpl implements MqttClient {
         MqttSubscribePayload payload = new MqttSubscribePayload(Collections.singletonList(subscription));
         MqttSubscribeMessage message = new MqttSubscribeMessage(fixedHeader, variableHeader, payload);
 
-        final MqttPendingSubscribtion pendingSubscribtion = new MqttPendingSubscribtion(future, topic, message);
-        pendingSubscribtion.addHandler(handler, once);
-        this.pendingSubscriptions.put(variableHeader.messageId(), pendingSubscribtion);
+        final MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(future, topic, message);
+        pendingSubscription.addHandler(handler, once);
+        this.pendingSubscriptions.put(variableHeader.messageId(), pendingSubscription);
         this.pendingSubscribeTopics.add(topic);
-        pendingSubscribtion.setSent(this.sendAndFlushPacket(message) != null); //If not sent, we will send it when the connection is opened
+        pendingSubscription.setSent(this.sendAndFlushPacket(message) != null); //If not sent, we will send it when the connection is opened
 
-        pendingSubscribtion.startRetransmitTimer(this.eventLoop.next(), this::sendAndFlushPacket);
+        pendingSubscription.startRetransmitTimer(this.eventLoop.next(), this::sendAndFlushPacket);
 
         return future;
     }
@@ -449,9 +449,9 @@ final class MqttClientImpl implements MqttClient {
             MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Collections.singletonList(topic));
             MqttUnsubscribeMessage message = new MqttUnsubscribeMessage(fixedHeader, variableHeader, payload);
 
-            MqttPendingUnsubscribtion pendingUnsubscribtion = new MqttPendingUnsubscribtion(promise, topic, message);
-            this.pendingServerUnsubscribes.put(variableHeader.messageId(), pendingUnsubscribtion);
-            pendingUnsubscribtion.startRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket);
+            MqttPendingUnsubscription pendingUnsubscription = new MqttPendingUnsubscription(promise, topic, message);
+            this.pendingServerUnsubscribes.put(variableHeader.messageId(), pendingUnsubscription);
+            pendingUnsubscription.startRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket);
 
             this.sendAndFlushPacket(message);
         } else {
@@ -459,7 +459,7 @@ final class MqttClientImpl implements MqttClient {
         }
     }
 
-    IntObjectHashMap<MqttPendingSubscribtion> getPendingSubscriptions() {
+    IntObjectHashMap<MqttPendingSubscription> getPendingSubscriptions() {
         return pendingSubscriptions;
     }
 
@@ -479,7 +479,7 @@ final class MqttClientImpl implements MqttClient {
         return serverSubscriptions;
     }
 
-    IntObjectHashMap<MqttPendingUnsubscribtion> getPendingServerUnsubscribes() {
+    IntObjectHashMap<MqttPendingUnsubscription> getPendingServerUnsubscribes() {
         return pendingServerUnsubscribes;
     }