thingsboard-aplcache

Improved GRPC callbacks

11/7/2018 11:55:41 AM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 091b504..a59e85c 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -61,6 +61,7 @@ import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
 import org.thingsboard.server.service.component.ComponentDiscoveryService;
 import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
+import org.thingsboard.server.service.executors.ClusterRpcCallbackExecutorService;
 import org.thingsboard.server.service.executors.DbCallbackExecutorService;
 import org.thingsboard.server.service.executors.ExternalCallExecutorService;
 import org.thingsboard.server.service.mail.MailExecutorService;
@@ -188,6 +189,10 @@ public class ActorSystemContext {
 
     @Autowired
     @Getter
+    private ClusterRpcCallbackExecutorService clusterRpcCallbackExecutor;
+
+    @Autowired
+    @Getter
     private DbCallbackExecutorService dbCallbackExecutor;
 
     @Autowired
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
index 760d4a6..669ed74 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
@@ -17,10 +17,12 @@ package org.thingsboard.server.actors.rpc;
 
 import akka.actor.ActorRef;
 import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.service.ActorService;
 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 import org.thingsboard.server.service.cluster.rpc.GrpcSession;
 import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
+import org.thingsboard.server.service.executors.ClusterRpcCallbackExecutorService;
 
 /**
  * @author Andrew Shvayka
@@ -28,12 +30,14 @@ import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
 @Slf4j
 public class BasicRpcSessionListener implements GrpcSessionListener {
 
+    private final ClusterRpcCallbackExecutorService callbackExecutorService;
     private final ActorService service;
     private final ActorRef manager;
     private final ActorRef self;
 
-    BasicRpcSessionListener(ActorService service, ActorRef manager, ActorRef self) {
-        this.service = service;
+    BasicRpcSessionListener(ActorSystemContext context, ActorRef manager, ActorRef self) {
+        this.service = context.getActorService();
+        this.callbackExecutorService = context.getClusterRpcCallbackExecutor();
         this.manager = manager;
         this.self = self;
     }
@@ -55,7 +59,13 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
     @Override
     public void onReceiveClusterGrpcMsg(GrpcSession session, ClusterAPIProtos.ClusterMessage clusterMessage) {
         log.trace("Received session actor msg from [{}][{}]: {}", session.getRemoteServer(), getType(session), clusterMessage);
-        service.onReceivedMsg(session.getRemoteServer(), clusterMessage);
+        callbackExecutorService.execute(() -> {
+            try {
+                service.onReceivedMsg(session.getRemoteServer(), clusterMessage);
+            } catch (Exception e) {
+                log.debug("[{}][{}] Failed to process cluster message: {}", session.getRemoteServer(), getType(session), clusterMessage, e);
+            }
+        });
     }
 
     @Override
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
index 9fa087e..b2931f8 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
@@ -16,7 +16,9 @@
 package org.thingsboard.server.actors.rpc;
 
 import akka.actor.ActorRef;
+import akka.actor.OneForOneStrategy;
 import akka.actor.Props;
+import akka.actor.SupervisorStrategy;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import lombok.extern.slf4j.Slf4j;
@@ -30,6 +32,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.cluster.ServerType;
 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 import org.thingsboard.server.service.cluster.discovery.ServerInstance;
+import scala.concurrent.duration.Duration;
 
 import java.util.*;
 
@@ -39,9 +42,7 @@ import java.util.*;
 public class RpcManagerActor extends ContextAwareActor {
 
     private final Map<ServerAddress, SessionActorInfo> sessionActors;
-
     private final Map<ServerAddress, Queue<ClusterAPIProtos.ClusterMessage>> pendingMsgs;
-
     private final ServerAddress instance;
 
     private RpcManagerActor(ActorSystemContext systemContext) {
@@ -63,7 +64,7 @@ public class RpcManagerActor extends ContextAwareActor {
     }
 
     @Override
-    public void onReceive(Object msg) throws Exception {
+    public void onReceive(Object msg) {
         if (msg instanceof ClusterAPIProtos.ClusterMessage) {
             onMsg((ClusterAPIProtos.ClusterMessage) msg);
         } else if (msg instanceof RpcBroadcastMsg) {
@@ -163,6 +164,7 @@ public class RpcManagerActor extends ContextAwareActor {
         log.info("[{}] session closed. Should reconnect: {}", remoteAddress, reconnect);
         SessionActorInfo sessionRef = sessionActors.get(remoteAddress);
         if (sessionRef != null && context().sender() != null && context().sender().equals(sessionRef.actor)) {
+            context().stop(sessionRef.actor);
             sessionActors.remove(remoteAddress);
             pendingMsgs.remove(remoteAddress);
             if (reconnect) {
@@ -172,9 +174,13 @@ public class RpcManagerActor extends ContextAwareActor {
     }
 
     private void onCreateSessionRequest(RpcSessionCreateRequestMsg msg) {
-        ActorRef actorRef = createSessionActor(msg);
         if (msg.getRemoteAddress() != null) {
-            register(msg.getRemoteAddress(), msg.getMsgUid(), actorRef);
+            if (!sessionActors.containsKey(msg.getRemoteAddress())) {
+                ActorRef actorRef = createSessionActor(msg);
+                register(msg.getRemoteAddress(), msg.getMsgUid(), actorRef);
+            }
+        } else {
+            createSessionActor(msg);
         }
     }
 
@@ -193,7 +199,8 @@ public class RpcManagerActor extends ContextAwareActor {
     private ActorRef createSessionActor(RpcSessionCreateRequestMsg msg) {
         log.info("[{}] Creating session actor.", msg.getMsgUid());
         ActorRef actor = context().actorOf(
-                Props.create(new RpcSessionActor.ActorCreator(systemContext, msg.getMsgUid())).withDispatcher(DefaultActorService.RPC_DISPATCHER_NAME));
+                Props.create(new RpcSessionActor.ActorCreator(systemContext, msg.getMsgUid()))
+                        .withDispatcher(DefaultActorService.RPC_DISPATCHER_NAME));
         actor.tell(msg, context().self());
         return actor;
     }
@@ -210,4 +217,14 @@ public class RpcManagerActor extends ContextAwareActor {
             return new RpcManagerActor(context);
         }
     }
+
+    @Override
+    public SupervisorStrategy supervisorStrategy() {
+        return strategy;
+    }
+
+    private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), t -> {
+        log.warn("Unknown failure", t);
+        return SupervisorStrategy.resume();
+    });
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
index 86509ca..2ca5c3e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
@@ -15,12 +15,10 @@
  */
 package org.thingsboard.server.actors.rpc;
 
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import io.grpc.Channel;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import io.grpc.stub.StreamObserver;
+import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.service.ContextAwareActor;
 import org.thingsboard.server.actors.service.ContextBasedCreator;
@@ -38,15 +36,15 @@ import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CO
 /**
  * @author Andrew Shvayka
  */
+@Slf4j
 public class RpcSessionActor extends ContextAwareActor {
 
-    private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
 
     private final UUID sessionId;
     private GrpcSession session;
     private GrpcSessionListener listener;
 
-    public RpcSessionActor(ActorSystemContext systemContext, UUID sessionId) {
+    private RpcSessionActor(ActorSystemContext systemContext, UUID sessionId) {
         super(systemContext);
         this.sessionId = sessionId;
     }
@@ -58,7 +56,7 @@ public class RpcSessionActor extends ContextAwareActor {
     }
 
     @Override
-    public void onReceive(Object msg) throws Exception {
+    public void onReceive(Object msg) {
         if (msg instanceof ClusterAPIProtos.ClusterMessage) {
             tell((ClusterAPIProtos.ClusterMessage) msg);
         } else if (msg instanceof RpcSessionCreateRequestMsg) {
@@ -67,19 +65,29 @@ public class RpcSessionActor extends ContextAwareActor {
     }
 
     private void tell(ClusterAPIProtos.ClusterMessage msg) {
-        session.sendMsg(msg);
+        if (session != null) {
+            session.sendMsg(msg);
+        } else {
+            log.trace("Failed to send message due to missing session!");
+        }
     }
 
     @Override
     public void postStop() {
-        log.info("Closing session -> {}", session.getRemoteServer());
-        session.close();
+        if (session != null) {
+            log.info("Closing session -> {}", session.getRemoteServer());
+            try {
+                session.close();
+            } catch (RuntimeException e) {
+                log.trace("Failed to close session!", e);
+            }
+        }
     }
 
     private void initSession(RpcSessionCreateRequestMsg msg) {
         log.info("[{}] Initializing session", context().self());
         ServerAddress remoteServer = msg.getRemoteAddress();
-        listener = new BasicRpcSessionListener(systemContext.getActorService(), context().parent(), context().self());
+        listener = new BasicRpcSessionListener(systemContext, context().parent(), context().self());
         if (msg.getRemoteAddress() == null) {
             // Server session
             session = new GrpcSession(listener);
@@ -113,7 +121,7 @@ public class RpcSessionActor extends ContextAwareActor {
         }
 
         @Override
-        public RpcSessionActor create() throws Exception {
+        public RpcSessionActor create() {
             return new RpcSessionActor(context, sessionId);
         }
     }
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
index 1c7e2d2..a53b88e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
@@ -37,7 +37,7 @@ public abstract class ContextAwareActor extends UntypedActor {
     }
 
     @Override
-    public void onReceive(Object msg) throws Exception {
+    public void onReceive(Object msg) {
         if (log.isDebugEnabled()) {
             log.debug("Processing msg: {}", msg);
         }
diff --git a/application/src/main/java/org/thingsboard/server/service/executors/ClusterRpcCallbackExecutorService.java b/application/src/main/java/org/thingsboard/server/service/executors/ClusterRpcCallbackExecutorService.java
new file mode 100644
index 0000000..2def8b0
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/executors/ClusterRpcCallbackExecutorService.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.executors;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ClusterRpcCallbackExecutorService extends AbstractListeningExecutor {
+
+    @Value("${actors.cluster.grpc_callback_thread_pool_size}")
+    private int grpcCallbackExecutorThreadPoolSize;
+
+    @Override
+    protected int getThreadPollSize() {
+        return grpcCallbackExecutorThreadPoolSize;
+    }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java b/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java
index cc8ecee..c78d7df 100644
--- a/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java
+++ b/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java
@@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry;
 
 import java.util.Collections;
+import java.util.UUID;
 
 import static org.thingsboard.server.common.data.CacheConstants.SESSIONS_CACHE;
 
@@ -47,4 +48,10 @@ public class DefaultDeviceSessionCacheService implements DeviceSessionCacheServi
         return sessions;
     }
 
+    public static void main (String[] args){
+        UUID uuid = UUID.fromString("d5db434e-9cd2-4903-8b3b-421b2d93664d");
+        System.out.println(uuid.getMostSignificantBits());
+        System.out.println(uuid.getLeastSignificantBits());
+    }
+
 }
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 60ce107..a2294d8 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -153,6 +153,8 @@ sql:
 
 # Actor system parameters
 actors:
+  cluster:
+    grpc_callback_thread_pool_size: "${ACTORS_CLUSTER_GRPC_CALLBACK_THREAD_POOL_SIZE:10}"
   tenant:
     create_components_on_init: "${ACTORS_TENANT_CREATE_COMPONENTS_ON_INIT:true}"
   session: