thingsboard-memoizeit
Changes
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java 3(+2 -1)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java 32(+21 -11)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java 6(+6 -0)
Details
diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index fa6e557..3986837 100644
--- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -166,7 +166,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
String topicName = mqttMsg.variableHeader().topicName();
int msgId = mqttMsg.variableHeader().packetId();
- log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId);
+ log.trace("[{}][{}] Processing publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId);
if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
if (gatewaySessionHandler != null) {
@@ -336,6 +336,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
String userName = msg.payload().userName();
+ log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName);
if (StringUtils.isEmpty(userName)) {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
ctx.close();
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
index fad1954..4af594c 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
@@ -68,7 +68,7 @@ public abstract class AbstractTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
+ if (checkLimits(sessionInfo, msg, callback)) {
reportActivityInternal(sessionInfo);
doProcess(sessionInfo, msg, callback);
}
@@ -76,7 +76,7 @@ public abstract class AbstractTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
+ if (checkLimits(sessionInfo, msg, callback)) {
reportActivityInternal(sessionInfo);
doProcess(sessionInfo, msg, callback);
}
@@ -84,7 +84,7 @@ public abstract class AbstractTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
+ if (checkLimits(sessionInfo, msg, callback)) {
reportActivityInternal(sessionInfo);
doProcess(sessionInfo, msg, callback);
}
@@ -92,7 +92,7 @@ public abstract class AbstractTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
+ if (checkLimits(sessionInfo, msg, callback)) {
reportActivityInternal(sessionInfo);
doProcess(sessionInfo, msg, callback);
}
@@ -100,7 +100,7 @@ public abstract class AbstractTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
+ if (checkLimits(sessionInfo, msg, callback)) {
SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe());
doProcess(sessionInfo, msg, callback);
@@ -109,7 +109,7 @@ public abstract class AbstractTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
+ if (checkLimits(sessionInfo, msg, callback)) {
SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
doProcess(sessionInfo, msg, callback);
@@ -118,7 +118,7 @@ public abstract class AbstractTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
+ if (checkLimits(sessionInfo, msg, callback)) {
reportActivityInternal(sessionInfo);
doProcess(sessionInfo, msg, callback);
}
@@ -126,7 +126,7 @@ public abstract class AbstractTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
+ if (checkLimits(sessionInfo, msg, callback)) {
reportActivityInternal(sessionInfo);
doProcess(sessionInfo, msg, callback);
}
@@ -196,7 +196,10 @@ public abstract class AbstractTransportService implements TransportService {
}
@Override
- public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, TransportServiceCallback<Void> callback) {
+ public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback) {
+ if (log.isTraceEnabled()) {
+ log.trace("[{}] Processing msg: {}", toId(sessionInfo), msg);
+ }
if (!rateLimitEnabled) {
return true;
}
@@ -206,6 +209,9 @@ public abstract class AbstractTransportService implements TransportService {
if (callback != null) {
callback.onError(new TbRateLimitsException(EntityType.TENANT));
}
+ if (log.isTraceEnabled()) {
+ log.trace("[{}][{}] Tenant level rate limit detected: {}", toId(sessionInfo), tenantId, msg);
+ }
return false;
}
DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
@@ -214,8 +220,12 @@ public abstract class AbstractTransportService implements TransportService {
if (callback != null) {
callback.onError(new TbRateLimitsException(EntityType.DEVICE));
}
+ if (log.isTraceEnabled()) {
+ log.trace("[{}][{}] Device level rate limit detected: {}", toId(sessionInfo), deviceId, msg);
+ }
return false;
}
+
return true;
}
@@ -250,11 +260,11 @@ public abstract class AbstractTransportService implements TransportService {
}
}
- private UUID toId(TransportProtos.SessionInfoProto sessionInfo) {
+ protected UUID toId(TransportProtos.SessionInfoProto sessionInfo) {
return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
}
- String getRoutingKey(TransportProtos.SessionInfoProto sessionInfo) {
+ protected String getRoutingKey(TransportProtos.SessionInfoProto sessionInfo) {
return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()).toString();
}
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
index 4b11bf5..5c123a6 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
@@ -197,6 +197,7 @@ public class RemoteTransportService extends AbstractTransportService {
@Override
public void process(ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) {
+ log.trace("Processing msg: {}", msg);
AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getToken(),
TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build()),
response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
@@ -204,6 +205,7 @@ public class RemoteTransportService extends AbstractTransportService {
@Override
public void process(ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) {
+ log.trace("Processing msg: {}", msg);
AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getHash(),
TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(msg).build()),
response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
@@ -211,6 +213,7 @@ public class RemoteTransportService extends AbstractTransportService {
@Override
public void process(GetOrCreateDeviceFromGatewayRequestMsg msg, TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg> callback) {
+ log.trace("Processing msg: {}", msg);
AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getDeviceName(),
TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(msg).build()),
response -> callback.onSuccess(response.getGetOrCreateDeviceResponseMsg()), callback::onError, transportCallbackExecutor);
@@ -218,6 +221,9 @@ public class RemoteTransportService extends AbstractTransportService {
@Override
public void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
+ if (log.isTraceEnabled()) {
+ log.trace("[{}] Processing msg: {}", toId(sessionInfo), msg);
+ }
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setSubscriptionInfo(msg).build()
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
index 93c0f41..5763c35 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
@@ -45,7 +45,6 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
this.deviceId = new DeviceId(new UUID(deviceInfo.getDeviceIdMSB(), deviceInfo.getDeviceIdLSB()));
}
-
public boolean isConnected() {
return deviceInfo != null;
}
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index 8944e94..bb9cb4e 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -43,7 +43,7 @@ public interface TransportService {
void process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg msg,
TransportServiceCallback<TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg> callback);
- boolean checkLimits(SessionInfoProto sessionInfo, TransportServiceCallback<Void> callback);
+ boolean checkLimits(SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback);