thingsboard-aplcache

Rate limits draft

10/22/2018 2:34:44 AM

Details

diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
index d670db3..0f72230 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
@@ -134,42 +134,58 @@ public class LocalTransportService extends AbstractTransportService implements R
 
     @Override
     public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
-        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSessionEvent(msg).build(), callback);
+        if (checkLimits(sessionInfo, callback)) {
+            forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSessionEvent(msg).build(), callback);
+        }
     }
 
     @Override
     public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
-        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostTelemetry(msg).build(), callback);
+        if (checkLimits(sessionInfo, callback)) {
+            forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostTelemetry(msg).build(), callback);
+        }
     }
 
     @Override
     public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
-        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostAttributes(msg).build(), callback);
+        if (checkLimits(sessionInfo, callback)) {
+            forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostAttributes(msg).build(), callback);
+        }
     }
 
     @Override
     public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
-        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setGetAttributes(msg).build(), callback);
+        if (checkLimits(sessionInfo, callback)) {
+            forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setGetAttributes(msg).build(), callback);
+        }
     }
 
     @Override
     public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
-        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback);
+        if (checkLimits(sessionInfo, callback)) {
+            forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback);
+        }
     }
 
     @Override
     public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
-        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(), callback);
+        if (checkLimits(sessionInfo, callback)) {
+            forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(), callback);
+        }
     }
 
     @Override
     public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
-        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build(), callback);
+        if (checkLimits(sessionInfo, callback)) {
+            forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build(), callback);
+        }
     }
 
     @Override
     public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
-        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback);
+        if (checkLimits(sessionInfo, callback)) {
+            forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback);
+        }
     }
 
     @Override
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 984fa28..8d65e3d 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -444,7 +444,7 @@ transport:
     bind_port: "${MQTT_BIND_PORT:1883}"
     timeout: "${MQTT_TIMEOUT:10000}"
     netty:
-      leak_detector_level: "${NETTY_LEASK_DETECTOR_LVL:DISABLED}"
+      leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}"
       boss_group_thread_count: "${NETTY_BOSS_GROUP_THREADS:1}"
       worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}"
       max_payload_size: "${NETTY_MAX_PAYLOAD_SIZE:65536}"
diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 89ecdd5..46a890f 100644
--- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -249,7 +249,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
             @Override
             public void onError(Throwable e) {
                 log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);
-                ctx.close();
+                processDisconnect(ctx);
             }
         };
     }
diff --git a/common/transport/transport-api/pom.xml b/common/transport/transport-api/pom.xml
index 3538e46..4ed6ed7 100644
--- a/common/transport/transport-api/pom.xml
+++ b/common/transport/transport-api/pom.xml
@@ -99,6 +99,10 @@
             <groupId>com.google.protobuf</groupId>
             <artifactId>protobuf-java</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.github.vladimir-bukhtoyarov</groupId>
+            <artifactId>bucket4j-core</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
index e299a02..265dacb 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
@@ -16,8 +16,13 @@
 package org.thingsboard.server.common.transport.service;
 
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.transport.SessionMsgListener;
 import org.thingsboard.server.common.transport.TransportService;
+import org.thingsboard.server.common.transport.TransportServiceCallback;
 import org.thingsboard.server.gen.transport.TransportProtos;
 
 import java.util.UUID;
@@ -36,9 +41,20 @@ import java.util.concurrent.TimeUnit;
 @Slf4j
 public abstract class AbstractTransportService implements TransportService {
 
+    @Value("${transport.rate_limits.enabled}")
+    private boolean rateLimitEnabled;
+    @Value("${transport.rate_limits.tenant}")
+    private String perTenantLimitsConf;
+    @Value("${transport.rate_limits.tenant}")
+    private String perDevicesLimitsConf;
+
     protected ScheduledExecutorService schedulerExecutor;
     protected ExecutorService transportCallbackExecutor;
-    protected ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();
+    private ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();
+
+    //TODO: Implement cleanup of this maps.
+    private ConcurrentMap<TenantId, TbTransportRateLimits> perTenantLimits = new ConcurrentHashMap<>();
+    private ConcurrentMap<DeviceId, TbTransportRateLimits> perDeviceLimits = new ConcurrentHashMap<>();
 
     @Override
     public void registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) {
@@ -53,7 +69,6 @@ public abstract class AbstractTransportService implements TransportService {
             listener.onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
             deregisterSession(sessionInfo);
         }, timeout, TimeUnit.MILLISECONDS);
-
     }
 
     @Override
@@ -61,6 +76,30 @@ public abstract class AbstractTransportService implements TransportService {
         sessions.remove(toId(sessionInfo));
     }
 
+    @Override
+    public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, TransportServiceCallback<Void> callback) {
+        if (!rateLimitEnabled) {
+            return true;
+        }
+        TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
+        TbTransportRateLimits rateLimits = perTenantLimits.computeIfAbsent(tenantId, id -> new TbTransportRateLimits(perTenantLimitsConf));
+        if (!rateLimits.tryConsume()) {
+            if (callback != null) {
+                callback.onError(new TbRateLimitsException(EntityType.TENANT));
+            }
+            return false;
+        }
+        DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
+        rateLimits = perDeviceLimits.computeIfAbsent(deviceId, id -> new TbTransportRateLimits(perDevicesLimitsConf));
+        if (!rateLimits.tryConsume()) {
+            if (callback != null) {
+                callback.onError(new TbRateLimitsException(EntityType.DEVICE));
+            }
+            return false;
+        }
+        return true;
+    }
+
     protected void processToTransportMsg(TransportProtos.DeviceActorToTransportMsg toSessionMsg) {
         UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB());
         SessionMetaData md = sessions.get(sessionId);
@@ -101,11 +140,20 @@ public abstract class AbstractTransportService implements TransportService {
     }
 
     public void init() {
+        if (rateLimitEnabled) {
+            //Just checking the configuration parameters
+            new TbTransportRateLimits(perTenantLimitsConf);
+            new TbTransportRateLimits(perDevicesLimitsConf);
+        }
         this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor();
         this.transportCallbackExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
     }
 
     public void destroy() {
+        if (rateLimitEnabled) {
+            perTenantLimits.clear();
+            perDeviceLimits.clear();
+        }
         if (schedulerExecutor != null) {
             schedulerExecutor.shutdownNow();
         }
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
index aa3b42b..6774942 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
@@ -218,74 +218,90 @@ public class RemoteTransportService extends AbstractTransportService {
 
     @Override
     public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
-        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
-                TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
-                        .setSessionEvent(msg).build()
-        ).build();
-        send(sessionInfo, toRuleEngineMsg, callback);
+        if (checkLimits(sessionInfo, callback)) {
+            ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                    TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                            .setSessionEvent(msg).build()
+            ).build();
+            send(sessionInfo, toRuleEngineMsg, callback);
+        }
     }
 
     @Override
     public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
-        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
-                TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
-                        .setPostTelemetry(msg).build()
-        ).build();
-        send(sessionInfo, toRuleEngineMsg, callback);
+        if (checkLimits(sessionInfo, callback)) {
+            ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                    TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                            .setPostTelemetry(msg).build()
+            ).build();
+            send(sessionInfo, toRuleEngineMsg, callback);
+        }
     }
 
     @Override
     public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
-        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
-                TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
-                        .setPostAttributes(msg).build()
-        ).build();
-        send(sessionInfo, toRuleEngineMsg, callback);
+        if (checkLimits(sessionInfo, callback)) {
+            ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                    TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                            .setPostAttributes(msg).build()
+            ).build();
+            send(sessionInfo, toRuleEngineMsg, callback);
+        }
     }
 
     @Override
     public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
-        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
-                TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
-                        .setGetAttributes(msg).build()
-        ).build();
-        send(sessionInfo, toRuleEngineMsg, callback);
+        if (checkLimits(sessionInfo, callback)) {
+            ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                    TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                            .setGetAttributes(msg).build()
+            ).build();
+            send(sessionInfo, toRuleEngineMsg, callback);
+        }
     }
 
     @Override
     public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
-        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
-                TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
-                        .setSubscribeToAttributes(msg).build()
-        ).build();
-        send(sessionInfo, toRuleEngineMsg, callback);
+        if (checkLimits(sessionInfo, callback)) {
+            ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                    TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                            .setSubscribeToAttributes(msg).build()
+            ).build();
+            send(sessionInfo, toRuleEngineMsg, callback);
+        }
     }
 
     @Override
     public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
-        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
-                TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
-                        .setSubscribeToRPC(msg).build()
-        ).build();
-        send(sessionInfo, toRuleEngineMsg, callback);
+        if (checkLimits(sessionInfo, callback)) {
+            ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                    TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                            .setSubscribeToRPC(msg).build()
+            ).build();
+            send(sessionInfo, toRuleEngineMsg, callback);
+        }
     }
 
     @Override
     public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
-        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
-                TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
-                        .setToDeviceRPCCallResponse(msg).build()
-        ).build();
-        send(sessionInfo, toRuleEngineMsg, callback);
+        if (checkLimits(sessionInfo, callback)) {
+            ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                    TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                            .setToDeviceRPCCallResponse(msg).build()
+            ).build();
+            send(sessionInfo, toRuleEngineMsg, callback);
+        }
     }
 
     @Override
     public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
-        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
-                TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
-                        .setToServerRPCCallRequest(msg).build()
-        ).build();
-        send(sessionInfo, toRuleEngineMsg, callback);
+        if (checkLimits(sessionInfo, callback)) {
+            ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                    TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                            .setToServerRPCCallRequest(msg).build()
+            ).build();
+            send(sessionInfo, toRuleEngineMsg, callback);
+        }
     }
 
     private static class TransportCallbackAdaptor implements Callback {
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TbRateLimitsException.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TbRateLimitsException.java
new file mode 100644
index 0000000..9d2669d
--- /dev/null
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TbRateLimitsException.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.service;
+
+import org.thingsboard.server.common.data.EntityType;
+
+/**
+ * Created by ashvayka on 22.10.18.
+ */
+public class TbRateLimitsException extends Exception {
+    private final EntityType entityType;
+
+    TbRateLimitsException(EntityType entityType) {
+        this.entityType = entityType;
+    }
+}
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TbTransportRateLimits.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TbTransportRateLimits.java
new file mode 100644
index 0000000..d598734
--- /dev/null
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TbTransportRateLimits.java
@@ -0,0 +1,53 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.service;
+
+import io.github.bucket4j.Bandwidth;
+import io.github.bucket4j.Bucket4j;
+import io.github.bucket4j.local.LocalBucket;
+import io.github.bucket4j.local.LocalBucketBuilder;
+
+import java.time.Duration;
+
+/**
+ * Created by ashvayka on 22.10.18.
+ */
+class TbTransportRateLimits {
+    private final LocalBucket bucket;
+
+    public TbTransportRateLimits(String limitsConfiguration) {
+        LocalBucketBuilder builder = Bucket4j.builder();
+        boolean initialized = false;
+        for (String limitSrc : limitsConfiguration.split(",")) {
+            long capacity = Long.parseLong(limitSrc.split(":")[0]);
+            long duration = Long.parseLong(limitSrc.split(":")[1]);
+            builder.addLimit(Bandwidth.simple(capacity, Duration.ofSeconds(duration)));
+            initialized = true;
+        }
+        if (initialized) {
+            bucket = builder.build();
+        } else {
+            throw new IllegalArgumentException("Failed to parse rate limits configuration: " + limitsConfiguration);
+        }
+
+
+    }
+
+    boolean tryConsume() {
+        return bucket.tryConsume(1);
+    }
+
+}
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index caf178a..a47438f 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -43,6 +43,8 @@ public interface TransportService {
     void process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg msg,
                  TransportServiceCallback<TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg> callback);
 
+    boolean checkLimits(SessionInfoProto sessionInfo, TransportServiceCallback<Void> callback);
+
     void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback);
 
     void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback);

pom.xml 6(+6 -0)

diff --git a/pom.xml b/pom.xml
index 0ac38cd..387e0ed 100755
--- a/pom.xml
+++ b/pom.xml
@@ -82,6 +82,7 @@
         <elasticsearch.version>5.0.2</elasticsearch.version>
         <delight-nashorn-sandbox.version>0.1.14</delight-nashorn-sandbox.version>
         <kafka.version>2.0.0</kafka.version>
+        <bucket4j.version>4.1.1</bucket4j.version>
     </properties>
 
     <modules>
@@ -778,6 +779,11 @@
                 <artifactId>delight-nashorn-sandbox</artifactId>
                 <version>${delight-nashorn-sandbox.version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.github.vladimir-bukhtoyarov</groupId>
+                <artifactId>bucket4j-core</artifactId>
+                <version>${bucket4j.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
index 1d425e4..a0e86bb 100644
--- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
+++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
@@ -25,7 +25,7 @@ transport:
     adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}"
     timeout: "${MQTT_TIMEOUT:10000}"
     netty:
-      leak_detector_level: "${NETTY_LEASK_DETECTOR_LVL:DISABLED}"
+      leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}"
       boss_group_thread_count: "${NETTY_BOSS_GROUP_THREADS:1}"
       worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}"
       max_payload_size: "${NETTY_MAX_PAYLOAD_SIZE:65536}"
@@ -43,6 +43,13 @@ transport:
       key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}"
       # Type of the key store
       key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}"
+  sessions:
+    max_per_tenant: "${TB_TRANSPORT_SESSIONS_MAX_PER_TENANT:1000}"
+    max_per_device: "${TB_TRANSPORT_SESSIONS_MAX_PER_DEVICE:2}"
+  rate_limits:
+    enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}"
+    tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}"
+    device: "${TB_TRANSPORT_RATE_LIMITS_DEVICE:10:1,300:60}"
 
 #Quota parameters
 quota: