thingsboard-memoizeit

Adding extra logs on transport layer

11/6/2018 12:57:50 PM

Details

diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index fa6e557..3986837 100644
--- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -166,7 +166,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
         }
         String topicName = mqttMsg.variableHeader().topicName();
         int msgId = mqttMsg.variableHeader().packetId();
-        log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId);
+        log.trace("[{}][{}] Processing publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId);
 
         if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
             if (gatewaySessionHandler != null) {
@@ -336,6 +336,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 
     private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
         String userName = msg.payload().userName();
+        log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName);
         if (StringUtils.isEmpty(userName)) {
             ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
             ctx.close();
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
index fad1954..4af594c 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
@@ -68,7 +68,7 @@ public abstract class AbstractTransportService implements TransportService {
 
     @Override
     public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
+        if (checkLimits(sessionInfo, msg, callback)) {
             reportActivityInternal(sessionInfo);
             doProcess(sessionInfo, msg, callback);
         }
@@ -76,7 +76,7 @@ public abstract class AbstractTransportService implements TransportService {
 
     @Override
     public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
+        if (checkLimits(sessionInfo, msg, callback)) {
             reportActivityInternal(sessionInfo);
             doProcess(sessionInfo, msg, callback);
         }
@@ -84,7 +84,7 @@ public abstract class AbstractTransportService implements TransportService {
 
     @Override
     public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
+        if (checkLimits(sessionInfo, msg, callback)) {
             reportActivityInternal(sessionInfo);
             doProcess(sessionInfo, msg, callback);
         }
@@ -92,7 +92,7 @@ public abstract class AbstractTransportService implements TransportService {
 
     @Override
     public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
+        if (checkLimits(sessionInfo, msg, callback)) {
             reportActivityInternal(sessionInfo);
             doProcess(sessionInfo, msg, callback);
         }
@@ -100,7 +100,7 @@ public abstract class AbstractTransportService implements TransportService {
 
     @Override
     public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
+        if (checkLimits(sessionInfo, msg, callback)) {
             SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
             sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe());
             doProcess(sessionInfo, msg, callback);
@@ -109,7 +109,7 @@ public abstract class AbstractTransportService implements TransportService {
 
     @Override
     public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
+        if (checkLimits(sessionInfo, msg, callback)) {
             SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
             sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
             doProcess(sessionInfo, msg, callback);
@@ -118,7 +118,7 @@ public abstract class AbstractTransportService implements TransportService {
 
     @Override
     public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
+        if (checkLimits(sessionInfo, msg, callback)) {
             reportActivityInternal(sessionInfo);
             doProcess(sessionInfo, msg, callback);
         }
@@ -126,7 +126,7 @@ public abstract class AbstractTransportService implements TransportService {
 
     @Override
     public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
+        if (checkLimits(sessionInfo, msg, callback)) {
             reportActivityInternal(sessionInfo);
             doProcess(sessionInfo, msg, callback);
         }
@@ -196,7 +196,10 @@ public abstract class AbstractTransportService implements TransportService {
     }
 
     @Override
-    public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, TransportServiceCallback<Void> callback) {
+    public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback) {
+        if (log.isTraceEnabled()) {
+            log.trace("[{}] Processing msg: {}", toId(sessionInfo), msg);
+        }
         if (!rateLimitEnabled) {
             return true;
         }
@@ -206,6 +209,9 @@ public abstract class AbstractTransportService implements TransportService {
             if (callback != null) {
                 callback.onError(new TbRateLimitsException(EntityType.TENANT));
             }
+            if (log.isTraceEnabled()) {
+                log.trace("[{}][{}] Tenant level rate limit detected: {}", toId(sessionInfo), tenantId, msg);
+            }
             return false;
         }
         DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
@@ -214,8 +220,12 @@ public abstract class AbstractTransportService implements TransportService {
             if (callback != null) {
                 callback.onError(new TbRateLimitsException(EntityType.DEVICE));
             }
+            if (log.isTraceEnabled()) {
+                log.trace("[{}][{}] Device level rate limit detected: {}", toId(sessionInfo), deviceId, msg);
+            }
             return false;
         }
+
         return true;
     }
 
@@ -250,11 +260,11 @@ public abstract class AbstractTransportService implements TransportService {
         }
     }
 
-    private UUID toId(TransportProtos.SessionInfoProto sessionInfo) {
+    protected UUID toId(TransportProtos.SessionInfoProto sessionInfo) {
         return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
     }
 
-    String getRoutingKey(TransportProtos.SessionInfoProto sessionInfo) {
+    protected String getRoutingKey(TransportProtos.SessionInfoProto sessionInfo) {
         return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()).toString();
     }
 
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
index 4b11bf5..5c123a6 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
@@ -197,6 +197,7 @@ public class RemoteTransportService extends AbstractTransportService {
 
     @Override
     public void process(ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) {
+        log.trace("Processing msg: {}", msg);
         AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getToken(),
                 TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build()),
                 response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
@@ -204,6 +205,7 @@ public class RemoteTransportService extends AbstractTransportService {
 
     @Override
     public void process(ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) {
+        log.trace("Processing msg: {}", msg);
         AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getHash(),
                 TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(msg).build()),
                 response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
@@ -211,6 +213,7 @@ public class RemoteTransportService extends AbstractTransportService {
 
     @Override
     public void process(GetOrCreateDeviceFromGatewayRequestMsg msg, TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg> callback) {
+        log.trace("Processing msg: {}", msg);
         AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getDeviceName(),
                 TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(msg).build()),
                 response -> callback.onSuccess(response.getGetOrCreateDeviceResponseMsg()), callback::onError, transportCallbackExecutor);
@@ -218,6 +221,9 @@ public class RemoteTransportService extends AbstractTransportService {
 
     @Override
     public void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
+        if (log.isTraceEnabled()) {
+            log.trace("[{}] Processing msg: {}", toId(sessionInfo), msg);
+        }
         ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
                 TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
                         .setSubscriptionInfo(msg).build()
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
index 93c0f41..5763c35 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
@@ -45,7 +45,6 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
         this.deviceId = new DeviceId(new UUID(deviceInfo.getDeviceIdMSB(), deviceInfo.getDeviceIdLSB()));
     }
 
-
     public boolean isConnected() {
         return deviceInfo != null;
     }
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index 8944e94..bb9cb4e 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -43,7 +43,7 @@ public interface TransportService {
     void process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg msg,
                  TransportServiceCallback<TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg> callback);
 
-    boolean checkLimits(SessionInfoProto sessionInfo, TransportServiceCallback<Void> callback);
+    boolean checkLimits(SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback);
 
     void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback);