thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/service/executors/ClusterRpcCallbackExecutorService.java 32(+32 -0)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 091b504..a59e85c 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -61,6 +61,7 @@ import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.component.ComponentDiscoveryService;
import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
+import org.thingsboard.server.service.executors.ClusterRpcCallbackExecutorService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.executors.ExternalCallExecutorService;
import org.thingsboard.server.service.mail.MailExecutorService;
@@ -188,6 +189,10 @@ public class ActorSystemContext {
@Autowired
@Getter
+ private ClusterRpcCallbackExecutorService clusterRpcCallbackExecutor;
+
+ @Autowired
+ @Getter
private DbCallbackExecutorService dbCallbackExecutor;
@Autowired
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
index 760d4a6..669ed74 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
@@ -17,10 +17,12 @@ package org.thingsboard.server.actors.rpc;
import akka.actor.ActorRef;
import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.rpc.GrpcSession;
import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
+import org.thingsboard.server.service.executors.ClusterRpcCallbackExecutorService;
/**
* @author Andrew Shvayka
@@ -28,12 +30,14 @@ import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
@Slf4j
public class BasicRpcSessionListener implements GrpcSessionListener {
+ private final ClusterRpcCallbackExecutorService callbackExecutorService;
private final ActorService service;
private final ActorRef manager;
private final ActorRef self;
- BasicRpcSessionListener(ActorService service, ActorRef manager, ActorRef self) {
- this.service = service;
+ BasicRpcSessionListener(ActorSystemContext context, ActorRef manager, ActorRef self) {
+ this.service = context.getActorService();
+ this.callbackExecutorService = context.getClusterRpcCallbackExecutor();
this.manager = manager;
this.self = self;
}
@@ -55,7 +59,13 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
@Override
public void onReceiveClusterGrpcMsg(GrpcSession session, ClusterAPIProtos.ClusterMessage clusterMessage) {
log.trace("Received session actor msg from [{}][{}]: {}", session.getRemoteServer(), getType(session), clusterMessage);
- service.onReceivedMsg(session.getRemoteServer(), clusterMessage);
+ callbackExecutorService.execute(() -> {
+ try {
+ service.onReceivedMsg(session.getRemoteServer(), clusterMessage);
+ } catch (Exception e) {
+ log.debug("[{}][{}] Failed to process cluster message: {}", session.getRemoteServer(), getType(session), clusterMessage, e);
+ }
+ });
}
@Override
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
index 9fa087e..b2931f8 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
@@ -16,7 +16,9 @@
package org.thingsboard.server.actors.rpc;
import akka.actor.ActorRef;
+import akka.actor.OneForOneStrategy;
import akka.actor.Props;
+import akka.actor.SupervisorStrategy;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import lombok.extern.slf4j.Slf4j;
@@ -30,6 +32,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.cluster.ServerType;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.discovery.ServerInstance;
+import scala.concurrent.duration.Duration;
import java.util.*;
@@ -39,9 +42,7 @@ import java.util.*;
public class RpcManagerActor extends ContextAwareActor {
private final Map<ServerAddress, SessionActorInfo> sessionActors;
-
private final Map<ServerAddress, Queue<ClusterAPIProtos.ClusterMessage>> pendingMsgs;
-
private final ServerAddress instance;
private RpcManagerActor(ActorSystemContext systemContext) {
@@ -63,7 +64,7 @@ public class RpcManagerActor extends ContextAwareActor {
}
@Override
- public void onReceive(Object msg) throws Exception {
+ public void onReceive(Object msg) {
if (msg instanceof ClusterAPIProtos.ClusterMessage) {
onMsg((ClusterAPIProtos.ClusterMessage) msg);
} else if (msg instanceof RpcBroadcastMsg) {
@@ -163,6 +164,7 @@ public class RpcManagerActor extends ContextAwareActor {
log.info("[{}] session closed. Should reconnect: {}", remoteAddress, reconnect);
SessionActorInfo sessionRef = sessionActors.get(remoteAddress);
if (sessionRef != null && context().sender() != null && context().sender().equals(sessionRef.actor)) {
+ context().stop(sessionRef.actor);
sessionActors.remove(remoteAddress);
pendingMsgs.remove(remoteAddress);
if (reconnect) {
@@ -172,9 +174,13 @@ public class RpcManagerActor extends ContextAwareActor {
}
private void onCreateSessionRequest(RpcSessionCreateRequestMsg msg) {
- ActorRef actorRef = createSessionActor(msg);
if (msg.getRemoteAddress() != null) {
- register(msg.getRemoteAddress(), msg.getMsgUid(), actorRef);
+ if (!sessionActors.containsKey(msg.getRemoteAddress())) {
+ ActorRef actorRef = createSessionActor(msg);
+ register(msg.getRemoteAddress(), msg.getMsgUid(), actorRef);
+ }
+ } else {
+ createSessionActor(msg);
}
}
@@ -193,7 +199,8 @@ public class RpcManagerActor extends ContextAwareActor {
private ActorRef createSessionActor(RpcSessionCreateRequestMsg msg) {
log.info("[{}] Creating session actor.", msg.getMsgUid());
ActorRef actor = context().actorOf(
- Props.create(new RpcSessionActor.ActorCreator(systemContext, msg.getMsgUid())).withDispatcher(DefaultActorService.RPC_DISPATCHER_NAME));
+ Props.create(new RpcSessionActor.ActorCreator(systemContext, msg.getMsgUid()))
+ .withDispatcher(DefaultActorService.RPC_DISPATCHER_NAME));
actor.tell(msg, context().self());
return actor;
}
@@ -210,4 +217,14 @@ public class RpcManagerActor extends ContextAwareActor {
return new RpcManagerActor(context);
}
}
+
+ @Override
+ public SupervisorStrategy supervisorStrategy() {
+ return strategy;
+ }
+
+ private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), t -> {
+ log.warn("Unknown failure", t);
+ return SupervisorStrategy.resume();
+ });
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
index 86509ca..2ca5c3e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
@@ -15,12 +15,10 @@
*/
package org.thingsboard.server.actors.rpc;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
+import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
@@ -38,15 +36,15 @@ import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CO
/**
* @author Andrew Shvayka
*/
+@Slf4j
public class RpcSessionActor extends ContextAwareActor {
- private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
private final UUID sessionId;
private GrpcSession session;
private GrpcSessionListener listener;
- public RpcSessionActor(ActorSystemContext systemContext, UUID sessionId) {
+ private RpcSessionActor(ActorSystemContext systemContext, UUID sessionId) {
super(systemContext);
this.sessionId = sessionId;
}
@@ -58,7 +56,7 @@ public class RpcSessionActor extends ContextAwareActor {
}
@Override
- public void onReceive(Object msg) throws Exception {
+ public void onReceive(Object msg) {
if (msg instanceof ClusterAPIProtos.ClusterMessage) {
tell((ClusterAPIProtos.ClusterMessage) msg);
} else if (msg instanceof RpcSessionCreateRequestMsg) {
@@ -67,19 +65,29 @@ public class RpcSessionActor extends ContextAwareActor {
}
private void tell(ClusterAPIProtos.ClusterMessage msg) {
- session.sendMsg(msg);
+ if (session != null) {
+ session.sendMsg(msg);
+ } else {
+ log.trace("Failed to send message due to missing session!");
+ }
}
@Override
public void postStop() {
- log.info("Closing session -> {}", session.getRemoteServer());
- session.close();
+ if (session != null) {
+ log.info("Closing session -> {}", session.getRemoteServer());
+ try {
+ session.close();
+ } catch (RuntimeException e) {
+ log.trace("Failed to close session!", e);
+ }
+ }
}
private void initSession(RpcSessionCreateRequestMsg msg) {
log.info("[{}] Initializing session", context().self());
ServerAddress remoteServer = msg.getRemoteAddress();
- listener = new BasicRpcSessionListener(systemContext.getActorService(), context().parent(), context().self());
+ listener = new BasicRpcSessionListener(systemContext, context().parent(), context().self());
if (msg.getRemoteAddress() == null) {
// Server session
session = new GrpcSession(listener);
@@ -113,7 +121,7 @@ public class RpcSessionActor extends ContextAwareActor {
}
@Override
- public RpcSessionActor create() throws Exception {
+ public RpcSessionActor create() {
return new RpcSessionActor(context, sessionId);
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
index 1c7e2d2..a53b88e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
@@ -37,7 +37,7 @@ public abstract class ContextAwareActor extends UntypedActor {
}
@Override
- public void onReceive(Object msg) throws Exception {
+ public void onReceive(Object msg) {
if (log.isDebugEnabled()) {
log.debug("Processing msg: {}", msg);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/executors/ClusterRpcCallbackExecutorService.java b/application/src/main/java/org/thingsboard/server/service/executors/ClusterRpcCallbackExecutorService.java
new file mode 100644
index 0000000..2def8b0
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/executors/ClusterRpcCallbackExecutorService.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.executors;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ClusterRpcCallbackExecutorService extends AbstractListeningExecutor {
+
+ @Value("${actors.cluster.grpc_callback_thread_pool_size}")
+ private int grpcCallbackExecutorThreadPoolSize;
+
+ @Override
+ protected int getThreadPollSize() {
+ return grpcCallbackExecutorThreadPoolSize;
+ }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java b/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java
index cc8ecee..c78d7df 100644
--- a/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java
+++ b/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java
@@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry;
import java.util.Collections;
+import java.util.UUID;
import static org.thingsboard.server.common.data.CacheConstants.SESSIONS_CACHE;
@@ -47,4 +48,10 @@ public class DefaultDeviceSessionCacheService implements DeviceSessionCacheServi
return sessions;
}
+ public static void main (String[] args){
+ UUID uuid = UUID.fromString("d5db434e-9cd2-4903-8b3b-421b2d93664d");
+ System.out.println(uuid.getMostSignificantBits());
+ System.out.println(uuid.getLeastSignificantBits());
+ }
+
}
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 60ce107..a2294d8 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -153,6 +153,8 @@ sql:
# Actor system parameters
actors:
+ cluster:
+ grpc_callback_thread_pool_size: "${ACTORS_CLUSTER_GRPC_CALLBACK_THREAD_POOL_SIZE:10}"
tenant:
create_components_on_init: "${ACTORS_TENANT_CREATE_COMPONENTS_ON_INIT:true}"
session: