thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java 32(+24 -8)
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java 2(+1 -1)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java 52(+50 -2)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java 96(+56 -40)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TbRateLimitsException.java 29(+29 -0)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TbTransportRateLimits.java 53(+53 -0)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java 2(+2 -0)
pom.xml 6(+6 -0)
Details
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
index d670db3..0f72230 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
@@ -134,42 +134,58 @@ public class LocalTransportService extends AbstractTransportService implements R
@Override
public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
- forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSessionEvent(msg).build(), callback);
+ if (checkLimits(sessionInfo, callback)) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSessionEvent(msg).build(), callback);
+ }
}
@Override
public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
- forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostTelemetry(msg).build(), callback);
+ if (checkLimits(sessionInfo, callback)) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostTelemetry(msg).build(), callback);
+ }
}
@Override
public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
- forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostAttributes(msg).build(), callback);
+ if (checkLimits(sessionInfo, callback)) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostAttributes(msg).build(), callback);
+ }
}
@Override
public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
- forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setGetAttributes(msg).build(), callback);
+ if (checkLimits(sessionInfo, callback)) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setGetAttributes(msg).build(), callback);
+ }
}
@Override
public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
- forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback);
+ if (checkLimits(sessionInfo, callback)) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback);
+ }
}
@Override
public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
- forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(), callback);
+ if (checkLimits(sessionInfo, callback)) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(), callback);
+ }
}
@Override
public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
- forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build(), callback);
+ if (checkLimits(sessionInfo, callback)) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build(), callback);
+ }
}
@Override
public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
- forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback);
+ if (checkLimits(sessionInfo, callback)) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback);
+ }
}
@Override
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 984fa28..8d65e3d 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -444,7 +444,7 @@ transport:
bind_port: "${MQTT_BIND_PORT:1883}"
timeout: "${MQTT_TIMEOUT:10000}"
netty:
- leak_detector_level: "${NETTY_LEASK_DETECTOR_LVL:DISABLED}"
+ leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}"
boss_group_thread_count: "${NETTY_BOSS_GROUP_THREADS:1}"
worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}"
max_payload_size: "${NETTY_MAX_PAYLOAD_SIZE:65536}"
diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 89ecdd5..46a890f 100644
--- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -249,7 +249,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onError(Throwable e) {
log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);
- ctx.close();
+ processDisconnect(ctx);
}
};
}
diff --git a/common/transport/transport-api/pom.xml b/common/transport/transport-api/pom.xml
index 3538e46..4ed6ed7 100644
--- a/common/transport/transport-api/pom.xml
+++ b/common/transport/transport-api/pom.xml
@@ -99,6 +99,10 @@
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.github.vladimir-bukhtoyarov</groupId>
+ <artifactId>bucket4j-core</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
index e299a02..265dacb 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
@@ -16,8 +16,13 @@
package org.thingsboard.server.common.transport.service;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportService;
+import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.UUID;
@@ -36,9 +41,20 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public abstract class AbstractTransportService implements TransportService {
+ @Value("${transport.rate_limits.enabled}")
+ private boolean rateLimitEnabled;
+ @Value("${transport.rate_limits.tenant}")
+ private String perTenantLimitsConf;
+ @Value("${transport.rate_limits.tenant}")
+ private String perDevicesLimitsConf;
+
protected ScheduledExecutorService schedulerExecutor;
protected ExecutorService transportCallbackExecutor;
- protected ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();
+ private ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();
+
+ //TODO: Implement cleanup of this maps.
+ private ConcurrentMap<TenantId, TbTransportRateLimits> perTenantLimits = new ConcurrentHashMap<>();
+ private ConcurrentMap<DeviceId, TbTransportRateLimits> perDeviceLimits = new ConcurrentHashMap<>();
@Override
public void registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) {
@@ -53,7 +69,6 @@ public abstract class AbstractTransportService implements TransportService {
listener.onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
deregisterSession(sessionInfo);
}, timeout, TimeUnit.MILLISECONDS);
-
}
@Override
@@ -61,6 +76,30 @@ public abstract class AbstractTransportService implements TransportService {
sessions.remove(toId(sessionInfo));
}
+ @Override
+ public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, TransportServiceCallback<Void> callback) {
+ if (!rateLimitEnabled) {
+ return true;
+ }
+ TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
+ TbTransportRateLimits rateLimits = perTenantLimits.computeIfAbsent(tenantId, id -> new TbTransportRateLimits(perTenantLimitsConf));
+ if (!rateLimits.tryConsume()) {
+ if (callback != null) {
+ callback.onError(new TbRateLimitsException(EntityType.TENANT));
+ }
+ return false;
+ }
+ DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
+ rateLimits = perDeviceLimits.computeIfAbsent(deviceId, id -> new TbTransportRateLimits(perDevicesLimitsConf));
+ if (!rateLimits.tryConsume()) {
+ if (callback != null) {
+ callback.onError(new TbRateLimitsException(EntityType.DEVICE));
+ }
+ return false;
+ }
+ return true;
+ }
+
protected void processToTransportMsg(TransportProtos.DeviceActorToTransportMsg toSessionMsg) {
UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB());
SessionMetaData md = sessions.get(sessionId);
@@ -101,11 +140,20 @@ public abstract class AbstractTransportService implements TransportService {
}
public void init() {
+ if (rateLimitEnabled) {
+ //Just checking the configuration parameters
+ new TbTransportRateLimits(perTenantLimitsConf);
+ new TbTransportRateLimits(perDevicesLimitsConf);
+ }
this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor();
this.transportCallbackExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
}
public void destroy() {
+ if (rateLimitEnabled) {
+ perTenantLimits.clear();
+ perDeviceLimits.clear();
+ }
if (schedulerExecutor != null) {
schedulerExecutor.shutdownNow();
}
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
index aa3b42b..6774942 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
@@ -218,74 +218,90 @@ public class RemoteTransportService extends AbstractTransportService {
@Override
public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
- ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
- TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
- .setSessionEvent(msg).build()
- ).build();
- send(sessionInfo, toRuleEngineMsg, callback);
+ if (checkLimits(sessionInfo, callback)) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setSessionEvent(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
+ }
}
@Override
public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
- ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
- TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
- .setPostTelemetry(msg).build()
- ).build();
- send(sessionInfo, toRuleEngineMsg, callback);
+ if (checkLimits(sessionInfo, callback)) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setPostTelemetry(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
+ }
}
@Override
public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
- ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
- TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
- .setPostAttributes(msg).build()
- ).build();
- send(sessionInfo, toRuleEngineMsg, callback);
+ if (checkLimits(sessionInfo, callback)) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setPostAttributes(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
+ }
}
@Override
public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
- ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
- TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
- .setGetAttributes(msg).build()
- ).build();
- send(sessionInfo, toRuleEngineMsg, callback);
+ if (checkLimits(sessionInfo, callback)) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setGetAttributes(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
+ }
}
@Override
public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
- ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
- TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
- .setSubscribeToAttributes(msg).build()
- ).build();
- send(sessionInfo, toRuleEngineMsg, callback);
+ if (checkLimits(sessionInfo, callback)) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setSubscribeToAttributes(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
+ }
}
@Override
public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
- ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
- TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
- .setSubscribeToRPC(msg).build()
- ).build();
- send(sessionInfo, toRuleEngineMsg, callback);
+ if (checkLimits(sessionInfo, callback)) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setSubscribeToRPC(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
+ }
}
@Override
public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
- ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
- TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
- .setToDeviceRPCCallResponse(msg).build()
- ).build();
- send(sessionInfo, toRuleEngineMsg, callback);
+ if (checkLimits(sessionInfo, callback)) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setToDeviceRPCCallResponse(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
+ }
}
@Override
public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
- ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
- TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
- .setToServerRPCCallRequest(msg).build()
- ).build();
- send(sessionInfo, toRuleEngineMsg, callback);
+ if (checkLimits(sessionInfo, callback)) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setToServerRPCCallRequest(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
+ }
}
private static class TransportCallbackAdaptor implements Callback {
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TbRateLimitsException.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TbRateLimitsException.java
new file mode 100644
index 0000000..9d2669d
--- /dev/null
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TbRateLimitsException.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.service;
+
+import org.thingsboard.server.common.data.EntityType;
+
+/**
+ * Created by ashvayka on 22.10.18.
+ */
+public class TbRateLimitsException extends Exception {
+ private final EntityType entityType;
+
+ TbRateLimitsException(EntityType entityType) {
+ this.entityType = entityType;
+ }
+}
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TbTransportRateLimits.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TbTransportRateLimits.java
new file mode 100644
index 0000000..d598734
--- /dev/null
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TbTransportRateLimits.java
@@ -0,0 +1,53 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.service;
+
+import io.github.bucket4j.Bandwidth;
+import io.github.bucket4j.Bucket4j;
+import io.github.bucket4j.local.LocalBucket;
+import io.github.bucket4j.local.LocalBucketBuilder;
+
+import java.time.Duration;
+
+/**
+ * Created by ashvayka on 22.10.18.
+ */
+class TbTransportRateLimits {
+ private final LocalBucket bucket;
+
+ public TbTransportRateLimits(String limitsConfiguration) {
+ LocalBucketBuilder builder = Bucket4j.builder();
+ boolean initialized = false;
+ for (String limitSrc : limitsConfiguration.split(",")) {
+ long capacity = Long.parseLong(limitSrc.split(":")[0]);
+ long duration = Long.parseLong(limitSrc.split(":")[1]);
+ builder.addLimit(Bandwidth.simple(capacity, Duration.ofSeconds(duration)));
+ initialized = true;
+ }
+ if (initialized) {
+ bucket = builder.build();
+ } else {
+ throw new IllegalArgumentException("Failed to parse rate limits configuration: " + limitsConfiguration);
+ }
+
+
+ }
+
+ boolean tryConsume() {
+ return bucket.tryConsume(1);
+ }
+
+}
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index caf178a..a47438f 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -43,6 +43,8 @@ public interface TransportService {
void process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg msg,
TransportServiceCallback<TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg> callback);
+ boolean checkLimits(SessionInfoProto sessionInfo, TransportServiceCallback<Void> callback);
+
void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback);
pom.xml 6(+6 -0)
diff --git a/pom.xml b/pom.xml
index 0ac38cd..387e0ed 100755
--- a/pom.xml
+++ b/pom.xml
@@ -82,6 +82,7 @@
<elasticsearch.version>5.0.2</elasticsearch.version>
<delight-nashorn-sandbox.version>0.1.14</delight-nashorn-sandbox.version>
<kafka.version>2.0.0</kafka.version>
+ <bucket4j.version>4.1.1</bucket4j.version>
</properties>
<modules>
@@ -778,6 +779,11 @@
<artifactId>delight-nashorn-sandbox</artifactId>
<version>${delight-nashorn-sandbox.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.github.vladimir-bukhtoyarov</groupId>
+ <artifactId>bucket4j-core</artifactId>
+ <version>${bucket4j.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
index 1d425e4..a0e86bb 100644
--- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
+++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
@@ -25,7 +25,7 @@ transport:
adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}"
timeout: "${MQTT_TIMEOUT:10000}"
netty:
- leak_detector_level: "${NETTY_LEASK_DETECTOR_LVL:DISABLED}"
+ leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}"
boss_group_thread_count: "${NETTY_BOSS_GROUP_THREADS:1}"
worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}"
max_payload_size: "${NETTY_MAX_PAYLOAD_SIZE:65536}"
@@ -43,6 +43,13 @@ transport:
key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}"
# Type of the key store
key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}"
+ sessions:
+ max_per_tenant: "${TB_TRANSPORT_SESSIONS_MAX_PER_TENANT:1000}"
+ max_per_device: "${TB_TRANSPORT_SESSIONS_MAX_PER_DEVICE:2}"
+ rate_limits:
+ enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}"
+ tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}"
+ device: "${TB_TRANSPORT_RATE_LIMITS_DEVICE:10:1,300:60}"
#Quota parameters
quota: