thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 25(+19 -6)
application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java 34(+22 -12)
application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java 52(+17 -35)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java 60(+50 -10)
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java 126(+54 -72)
docker/tb-node/conf/logback.xml 2(+1 -1)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 08a4454..a16bd78 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -201,7 +201,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(ActorContext context, UUID sessionId, String nodeId, Set<Integer> sentOneWayIds) {
return entry -> {
- ToDeviceRpcRequestActorMsg requestActorMsg = entry.getValue().getMsg();
ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg();
ToDeviceRpcRequestBody body = request.getBody();
if (request.isOneway()) {
@@ -486,6 +485,12 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription());
+ if (subscriptionInfo.getAttributeSubscription()) {
+ attributeSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
+ }
+ if (subscriptionInfo.getRpcSubscription()) {
+ rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
+ }
}
dumpSessions();
}
@@ -618,8 +623,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
}
private void restoreSessions() {
+ logger.debug("[{}] Restoring sessions from cache", deviceId);
TransportProtos.DeviceSessionsCacheEntry sessionsDump = systemContext.getDeviceSessionCacheService().get(deviceId);
if (sessionsDump.getSerializedSize() == 0) {
+ logger.debug("[{}] No session information found", deviceId);
return;
}
for (TransportProtos.SessionSubscriptionInfoProto sessionSubscriptionInfoProto : sessionsDump.getSessionsList()) {
@@ -627,18 +634,23 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
UUID sessionId = getSessionId(sessionInfoProto);
SessionInfo sessionInfo = new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfoProto.getNodeId());
TransportProtos.SubscriptionInfoProto subInfo = sessionSubscriptionInfoProto.getSubscriptionInfo();
- SessionInfoMetaData sessionInfoMetaData = new SessionInfoMetaData(sessionInfo, subInfo.getLastActivityTime());
- sessions.put(sessionId, sessionInfoMetaData);
- if (subInfo.getAttributeSubscription()) {
- rpcSubscriptions.put(sessionId, sessionInfo);
- }
+ SessionInfoMetaData sessionMD = new SessionInfoMetaData(sessionInfo, subInfo.getLastActivityTime());
+ sessions.put(sessionId, sessionMD);
if (subInfo.getAttributeSubscription()) {
attributeSubscriptions.put(sessionId, sessionInfo);
+ sessionMD.setSubscribedToAttributes(true);
+ }
+ if (subInfo.getRpcSubscription()) {
+ rpcSubscriptions.put(sessionId, sessionInfo);
+ sessionMD.setSubscribedToRPC(true);
}
+ logger.debug("[{}] Restored session: {}", deviceId, sessionMD);
}
+ logger.debug("[{}] Restored sessions: {}, rpc subscriptions: {}, attribute subscriptions: {}", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
}
private void dumpSessions() {
+ logger.debug("[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
List<TransportProtos.SessionSubscriptionInfoProto> sessionsList = new ArrayList<>(sessions.size());
sessions.forEach((uuid, sessionMD) -> {
if (sessionMD.getSessionInfo().getType() == TransportProtos.SessionType.SYNC) {
@@ -656,6 +668,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
sessionsList.add(TransportProtos.SessionSubscriptionInfoProto.newBuilder()
.setSessionInfo(sessionInfoProto)
.setSubscriptionInfo(subscriptionInfoProto).build());
+ logger.debug("[{}] Dumping session: {}", deviceId, sessionMD);
});
systemContext.getDeviceSessionCacheService()
.put(deviceId, TransportProtos.DeviceSessionsCacheEntry.newBuilder()
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
index 14bb636..1eba066 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
@@ -32,7 +32,7 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
private final ActorRef manager;
private final ActorRef self;
- public BasicRpcSessionListener(ActorService service, ActorRef manager, ActorRef self) {
+ BasicRpcSessionListener(ActorService service, ActorRef manager, ActorRef self) {
this.service = service;
this.manager = manager;
this.self = self;
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
index 66174e1..31320ce 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
@@ -103,10 +103,10 @@ public class RpcManagerActor extends ContextAwareActor {
ServerAddress address = new ServerAddress(msg.getServerAddress().getHost(), msg.getServerAddress().getPort(), ServerType.CORE);
SessionActorInfo session = sessionActors.get(address);
if (session != null) {
- log.debug("{} Forwarding msg to session actor", address);
+ log.debug("{} Forwarding msg to session actor: {}", address, msg);
session.getActor().tell(msg, ActorRef.noSender());
} else {
- log.debug("{} Storing msg to pending queue", address);
+ log.debug("{} Storing msg to pending queue: {}", address, msg);
Queue<ClusterAPIProtos.ClusterMessage> queue = pendingMsgs.get(address);
if (queue == null) {
queue = new LinkedList<>();
@@ -116,7 +116,7 @@ public class RpcManagerActor extends ContextAwareActor {
queue.add(msg);
}
} else {
- logger.warning("Cluster msg doesn't have set Server Address [{}]", msg);
+ logger.warning("Cluster msg doesn't have server address [{}]", msg);
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
index c9cf869..86509ca 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.actors.rpc;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import io.grpc.Channel;
+import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import org.thingsboard.server.actors.ActorSystemContext;
@@ -88,8 +89,8 @@ public class RpcSessionActor extends ContextAwareActor {
systemContext.getRpcService().onSessionCreated(msg.getMsgUid(), session.getInputStream());
} else {
// Client session
- Channel channel = ManagedChannelBuilder.forAddress(remoteServer.getHost(), remoteServer.getPort()).usePlaintext(true).build();
- session = new GrpcSession(remoteServer, listener);
+ ManagedChannel channel = ManagedChannelBuilder.forAddress(remoteServer.getHost(), remoteServer.getPort()).usePlaintext().build();
+ session = new GrpcSession(remoteServer, listener, channel);
session.initInputStream();
ClusterRpcServiceGrpc.ClusterRpcServiceStub stub = ClusterRpcServiceGrpc.newStub(channel);
diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
index f822e4e..721d828 100644
--- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
@@ -108,7 +108,7 @@ public class TenantActor extends RuleChainManagerActor {
@Override
protected void broadcast(Object msg) {
super.broadcast(msg);
- deviceActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
+// deviceActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
}
private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
index 55cf52c..dd0a995 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
@@ -15,6 +15,8 @@
*/
package org.thingsboard.server.service.cluster.rpc;
+import io.grpc.Channel;
+import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@@ -34,6 +36,7 @@ public final class GrpcSession implements Closeable {
private final UUID sessionId;
private final boolean client;
private final GrpcSessionListener listener;
+ private final ManagedChannel channel;
private StreamObserver<ClusterAPIProtos.ClusterMessage> inputStream;
private StreamObserver<ClusterAPIProtos.ClusterMessage> outputStream;
@@ -41,10 +44,10 @@ public final class GrpcSession implements Closeable {
private ServerAddress remoteServer;
public GrpcSession(GrpcSessionListener listener) {
- this(null, listener);
+ this(null, listener, null);
}
- public GrpcSession(ServerAddress remoteServer, GrpcSessionListener listener) {
+ public GrpcSession(ServerAddress remoteServer, GrpcSessionListener listener, ManagedChannel channel) {
this.sessionId = UUID.randomUUID();
this.listener = listener;
if (remoteServer != null) {
@@ -54,6 +57,7 @@ public final class GrpcSession implements Closeable {
} else {
this.client = false;
}
+ this.channel = channel;
}
public void initInputStream() {
@@ -105,5 +109,8 @@ public final class GrpcSession implements Closeable {
} catch (IllegalStateException e) {
log.debug("[{}] Failed to close output stream: {}", sessionId, e.getMessage());
}
+ if (channel != null) {
+ channel.shutdownNow();
+ }
}
}
diff --git a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
index f3ceed2..5a1e559 100644
--- a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
@@ -30,7 +30,6 @@ import org.thingsboard.rule.engine.api.NodeConfiguration;
import org.thingsboard.rule.engine.api.NodeDefinition;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbRelationTypes;
-import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.plugin.ComponentDescriptor;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.dao.component.ComponentDescriptorService;
@@ -52,6 +51,7 @@ import java.util.Set;
@Slf4j
public class AnnotationComponentDiscoveryService implements ComponentDiscoveryService {
+ public static final int MAX_OPTIMISITC_RETRIES = 3;
@Value("${plugins.scan_packages}")
private String[] scanPackages;
@@ -81,17 +81,27 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
private void registerRuleNodeComponents() {
Set<BeanDefinition> ruleNodeBeanDefinitions = getBeanDefinitions(RuleNode.class);
for (BeanDefinition def : ruleNodeBeanDefinitions) {
- try {
- String clazzName = def.getBeanClassName();
- Class<?> clazz = Class.forName(clazzName);
- RuleNode ruleNodeAnnotation = clazz.getAnnotation(RuleNode.class);
- ComponentType type = ruleNodeAnnotation.type();
- ComponentDescriptor component = scanAndPersistComponent(def, type);
- components.put(component.getClazz(), component);
- componentsMap.computeIfAbsent(type, k -> new ArrayList<>()).add(component);
- } catch (Exception e) {
- log.error("Can't initialize component {}, due to {}", def.getBeanClassName(), e.getMessage(), e);
- throw new RuntimeException(e);
+ int retryCount = 0;
+ Exception cause = null;
+ while (retryCount < MAX_OPTIMISITC_RETRIES) {
+ try {
+ String clazzName = def.getBeanClassName();
+ Class<?> clazz = Class.forName(clazzName);
+ RuleNode ruleNodeAnnotation = clazz.getAnnotation(RuleNode.class);
+ ComponentType type = ruleNodeAnnotation.type();
+ ComponentDescriptor component = scanAndPersistComponent(def, type);
+ components.put(component.getClazz(), component);
+ componentsMap.computeIfAbsent(type, k -> new ArrayList<>()).add(component);
+ break;
+ } catch (Exception e) {
+ log.trace("Can't initialize component {}, due to {}", def.getBeanClassName(), e.getMessage(), e);
+ cause = e;
+ retryCount++;
+ }
+ }
+ if (cause != null && retryCount == MAX_OPTIMISITC_RETRIES) {
+ log.error("Can't initialize component {}, due to {}", def.getBeanClassName(), cause.getMessage(), cause);
+ throw new RuntimeException(cause);
}
}
}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
index d3782e8..3d12938 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
@@ -133,66 +133,48 @@ public class LocalTransportService extends AbstractTransportService implements R
}
@Override
- public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
- forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSessionEvent(msg).build(), callback);
- }
+ protected void doProcess(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSessionEvent(msg).build(), callback);
}
@Override
- public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
- forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostTelemetry(msg).build(), callback);
- }
+ protected void doProcess(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostTelemetry(msg).build(), callback);
}
@Override
- public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
- forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostAttributes(msg).build(), callback);
- }
+ protected void doProcess(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostAttributes(msg).build(), callback);
}
@Override
- public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
- forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setGetAttributes(msg).build(), callback);
- }
+ protected void doProcess(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setGetAttributes(msg).build(), callback);
}
@Override
public void process(SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
- forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscriptionInfo(msg).build(), callback);
- }
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscriptionInfo(msg).build(), callback);
}
@Override
- public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
- forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback);
- }
+ protected void doProcess(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback);
}
@Override
- public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
- forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(), callback);
- }
+ protected void doProcess(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(), callback);
}
@Override
- public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
- forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build(), callback);
- }
+ protected void doProcess(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build(), callback);
}
@Override
- public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
- forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback);
- }
+ protected void doProcess(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
+ forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback);
}
@Override
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
index 610a490..bd42f31 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.header.Header;
import java.util.List;
@@ -75,7 +76,11 @@ public class TBKafkaProducerTemplate<T> {
CreateTopicsResult result = admin.createTopic(new NewTopic(defaultTopic, 100, (short) 1));
result.all().get();
} catch (Exception e) {
- log.trace("Failed to create topic: {}", e.getMessage(), e);
+ if ((e instanceof TopicExistsException) || (e.getCause() != null && e.getCause() instanceof TopicExistsException)) {
+ log.trace("[{}] Topic already exists: ", defaultTopic);
+ } else {
+ log.trace("[{}] Failed to create topic: {}", defaultTopic, e.getMessage(), e);
+ }
}
//Maybe this should not be cached, but we don't plan to change size of partitions
this.partitionInfoMap = new ConcurrentHashMap<>();
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
index 3bb84ec..fad1954 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
@@ -68,44 +68,68 @@ public abstract class AbstractTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback) {
- reportActivityInternal(sessionInfo);
+ if (checkLimits(sessionInfo, callback)) {
+ reportActivityInternal(sessionInfo);
+ doProcess(sessionInfo, msg, callback);
+ }
}
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
- reportActivityInternal(sessionInfo);
+ if (checkLimits(sessionInfo, callback)) {
+ reportActivityInternal(sessionInfo);
+ doProcess(sessionInfo, msg, callback);
+ }
}
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
- reportActivityInternal(sessionInfo);
+ if (checkLimits(sessionInfo, callback)) {
+ reportActivityInternal(sessionInfo);
+ doProcess(sessionInfo, msg, callback);
+ }
}
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
- reportActivityInternal(sessionInfo);
+ if (checkLimits(sessionInfo, callback)) {
+ reportActivityInternal(sessionInfo);
+ doProcess(sessionInfo, msg, callback);
+ }
}
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
- SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
- sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe());
+ if (checkLimits(sessionInfo, callback)) {
+ SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
+ sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe());
+ doProcess(sessionInfo, msg, callback);
+ }
}
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
- SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
- sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
+ if (checkLimits(sessionInfo, callback)) {
+ SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
+ sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
+ doProcess(sessionInfo, msg, callback);
+ }
}
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
- reportActivityInternal(sessionInfo);
+ if (checkLimits(sessionInfo, callback)) {
+ reportActivityInternal(sessionInfo);
+ doProcess(sessionInfo, msg, callback);
+ }
}
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
- reportActivityInternal(sessionInfo);
+ if (checkLimits(sessionInfo, callback)) {
+ reportActivityInternal(sessionInfo);
+ doProcess(sessionInfo, msg, callback);
+ }
}
@Override
@@ -113,6 +137,22 @@ public abstract class AbstractTransportService implements TransportService {
reportActivityInternal(sessionInfo);
}
+ protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback);
+
+ protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback);
+
+ protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback);
+
+ protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback);
+
+ protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback);
+
+ protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback);
+
+ protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback);
+
+ protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
+
private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
UUID sessionId = toId(sessionInfo);
SessionMetaData sessionMetaData = sessions.get(sessionId);
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
index 0910961..4b11bf5 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
@@ -217,102 +217,84 @@ public class RemoteTransportService extends AbstractTransportService {
}
@Override
- public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
- ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
- TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
- .setSessionEvent(msg).build()
- ).build();
- send(sessionInfo, toRuleEngineMsg, callback);
- }
+ public void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setSubscriptionInfo(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
}
@Override
- public void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
- ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
- TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
- .setSubscriptionInfo(msg).build()
- ).build();
- send(sessionInfo, toRuleEngineMsg, callback);
- }
+ protected void doProcess(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setSessionEvent(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
}
@Override
- public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
- ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
- TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
- .setPostTelemetry(msg).build()
- ).build();
- send(sessionInfo, toRuleEngineMsg, callback);
- }
+ protected void doProcess(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setPostTelemetry(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
}
@Override
- public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
- ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
- TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
- .setPostAttributes(msg).build()
- ).build();
- send(sessionInfo, toRuleEngineMsg, callback);
- }
+ protected void doProcess(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setPostAttributes(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
}
@Override
- public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
- ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
- TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
- .setGetAttributes(msg).build()
- ).build();
- send(sessionInfo, toRuleEngineMsg, callback);
- }
+ protected void doProcess(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setGetAttributes(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
}
@Override
- public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
- ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
- TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
- .setSubscribeToAttributes(msg).build()
- ).build();
- send(sessionInfo, toRuleEngineMsg, callback);
- }
+ protected void doProcess(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setSubscribeToAttributes(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
}
@Override
- public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
- ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
- TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
- .setSubscribeToRPC(msg).build()
- ).build();
- send(sessionInfo, toRuleEngineMsg, callback);
- }
+ protected void doProcess(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setSubscribeToRPC(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
}
@Override
- public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
- ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
- TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
- .setToDeviceRPCCallResponse(msg).build()
- ).build();
- send(sessionInfo, toRuleEngineMsg, callback);
- }
+ protected void doProcess(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setToDeviceRPCCallResponse(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
}
@Override
- public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
- if (checkLimits(sessionInfo, callback)) {
- ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
- TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
- .setToServerRPCCallRequest(msg).build()
- ).build();
- send(sessionInfo, toRuleEngineMsg, callback);
- }
+ protected void doProcess(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
+ ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+ TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+ .setToServerRPCCallRequest(msg).build()
+ ).build();
+ send(sessionInfo, toRuleEngineMsg, callback);
}
private static class TransportCallbackAdaptor implements Callback {
docker/tb-node/conf/logback.xml 2(+1 -1)
diff --git a/docker/tb-node/conf/logback.xml b/docker/tb-node/conf/logback.xml
index 1c69f53..8bf38eb 100644
--- a/docker/tb-node/conf/logback.xml
+++ b/docker/tb-node/conf/logback.xml
@@ -40,7 +40,7 @@
</encoder>
</appender>
- <logger name="org.thingsboard.server" level="INFO" />
+ <logger name="org.thingsboard.server" level="TRACE" />
<logger name="akka" level="INFO" />
<root level="INFO">