thingsboard-aplcache

Merge pull request #78 from thingsboard/feature/dispatchers Separate

3/16/2017 6:35:41 AM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
index bea51db..876b525 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
@@ -32,10 +32,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.common.data.DataConstants;
 import org.thingsboard.server.common.data.Device;
-import org.thingsboard.server.common.data.id.CustomerId;
-import org.thingsboard.server.common.data.id.DeviceId;
-import org.thingsboard.server.common.data.id.PluginId;
-import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.id.*;
 import org.thingsboard.server.common.data.kv.AttributeKey;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
@@ -74,6 +71,10 @@ public final class PluginProcessingContext implements PluginContext {
         this.securityCtx = Optional.ofNullable(securityCtx);
     }
 
+    public void persistError(String method, Exception e) {
+        pluginCtx.persistError(method, e);
+    }
+
     @Override
     public void sendPluginRpcMsg(RpcMsg msg) {
         this.pluginCtx.rpcService.tell(new PluginRpcMsg(pluginCtx.tenantId, pluginCtx.pluginId, msg));
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java
index 80c09b6..b09b72e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java
@@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.controller.plugin.PluginWebSocketMsgEndpoint;
@@ -73,6 +74,10 @@ public final class SharedPluginProcessingContext {
         return pluginId;
     }
 
+    public TenantId getPluginTenantId() {
+        return tenantId;
+    }
+
     public void toDeviceActor(DeviceAttributesEventNotificationMsg msg) {
         forward(msg.getDeviceId(), msg, rpcService::tell);
     }
@@ -105,6 +110,10 @@ public final class SharedPluginProcessingContext {
 
     }
 
+    public void persistError(String method, Exception e) {
+        systemContext.persistError(tenantId, pluginId, method, e);
+    }
+
     public ActorRef self() {
         return currentActor;
     }
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index 25b69e2..54160bc 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -69,8 +69,10 @@ public class DefaultActorService implements ActorService {
 
     public static final String APP_DISPATCHER_NAME = "app-dispatcher";
     public static final String CORE_DISPATCHER_NAME = "core-dispatcher";
-    public static final String RULE_DISPATCHER_NAME = "rule-dispatcher";
-    public static final String PLUGIN_DISPATCHER_NAME = "plugin-dispatcher";
+    public static final String SYSTEM_RULE_DISPATCHER_NAME = "system-rule-dispatcher";
+    public static final String SYSTEM_PLUGIN_DISPATCHER_NAME = "system-plugin-dispatcher";
+    public static final String TENANT_RULE_DISPATCHER_NAME = "rule-dispatcher";
+    public static final String TENANT_PLUGIN_DISPATCHER_NAME = "plugin-dispatcher";
     public static final String SESSION_DISPATCHER_NAME = "session-dispatcher";
     public static final String RPC_DISPATCHER_NAME = "rpc-dispatcher";
 
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/plugin/PluginManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/plugin/PluginManager.java
index d0e1e6a..19a8d11 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/plugin/PluginManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/plugin/PluginManager.java
@@ -22,7 +22,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.plugin.PluginActor;
 import org.thingsboard.server.actors.service.ContextAwareActor;
-import org.thingsboard.server.actors.service.DefaultActorService;
 import org.thingsboard.server.common.data.id.PluginId;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.page.PageDataIterable;
@@ -60,10 +59,12 @@ public abstract class PluginManager {
 
     abstract TenantId getTenantId();
 
+    abstract String getDispatcherName();
+
     public ActorRef getOrCreatePluginActor(ActorContext context, PluginId pluginId) {
         return pluginActors.computeIfAbsent(pluginId, pId ->
                 context.actorOf(Props.create(new PluginActor.ActorCreator(systemContext, getTenantId(), pId))
-                        .withDispatcher(DefaultActorService.PLUGIN_DISPATCHER_NAME), pId.toString()));
+                        .withDispatcher(getDispatcherName()), pId.toString()));
     }
 
     public void broadcast(Object msg) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/plugin/SystemPluginManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/plugin/SystemPluginManager.java
index 95439e9..9abe694 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/plugin/SystemPluginManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/plugin/SystemPluginManager.java
@@ -16,6 +16,7 @@
 package org.thingsboard.server.actors.shared.plugin;
 
 import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.service.DefaultActorService;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
 import org.thingsboard.server.common.data.plugin.PluginMetaData;
@@ -37,4 +38,8 @@ public class SystemPluginManager extends PluginManager {
         return BasePluginService.SYSTEM_TENANT;
     }
 
+    @Override
+    protected String getDispatcherName() {
+        return DefaultActorService.SYSTEM_PLUGIN_DISPATCHER_NAME;
+    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/plugin/TenantPluginManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/plugin/TenantPluginManager.java
index 3651d10..89f9efe 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/plugin/TenantPluginManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/plugin/TenantPluginManager.java
@@ -16,6 +16,7 @@
 package org.thingsboard.server.actors.shared.plugin;
 
 import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.service.DefaultActorService;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
 import org.thingsboard.server.common.data.plugin.PluginMetaData;
@@ -38,4 +39,10 @@ public class TenantPluginManager extends PluginManager {
     TenantId getTenantId() {
         return tenantId;
     }
+
+    @Override
+    protected String getDispatcherName() {
+        return DefaultActorService.TENANT_PLUGIN_DISPATCHER_NAME;
+    }
+
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java
index ba563c3..db61b81 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java
@@ -100,10 +100,12 @@ public abstract class RuleManager {
 
     abstract FetchFunction<RuleMetaData> getFetchRulesFunction();
 
+    abstract String getDispatcherName();
+
     public ActorRef getOrCreateRuleActor(ActorContext context, RuleId ruleId) {
         return ruleActors.computeIfAbsent(ruleId, rId ->
                 context.actorOf(Props.create(new RuleActor.ActorCreator(systemContext, tenantId, rId))
-                        .withDispatcher(DefaultActorService.RULE_DISPATCHER_NAME), rId.toString()));
+                        .withDispatcher(getDispatcherName()), rId.toString()));
     }
 
     public RuleActorChain getRuleChain() {
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/rule/SystemRuleManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/rule/SystemRuleManager.java
index 91bc5aa..8158d77 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/rule/SystemRuleManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/rule/SystemRuleManager.java
@@ -16,6 +16,7 @@
 package org.thingsboard.server.actors.shared.rule;
 
 import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.service.DefaultActorService;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
 import org.thingsboard.server.common.data.rule.RuleMetaData;
@@ -32,4 +33,8 @@ public class SystemRuleManager extends RuleManager {
         return ruleService::findSystemRules;
     }
 
+    @Override
+    String getDispatcherName() {
+        return DefaultActorService.SYSTEM_RULE_DISPATCHER_NAME;
+    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/rule/TenantRuleManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/rule/TenantRuleManager.java
index 2cd75dc..05a1c74 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/rule/TenantRuleManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/rule/TenantRuleManager.java
@@ -16,6 +16,7 @@
 package org.thingsboard.server.actors.shared.rule;
 
 import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.service.DefaultActorService;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
 import org.thingsboard.server.common.data.rule.RuleMetaData;
@@ -31,4 +32,9 @@ public class TenantRuleManager extends RuleManager {
         return link -> ruleService.findTenantRules(tenantId, link);
     }
 
+    @Override
+    String getDispatcherName() {
+        return DefaultActorService.TENANT_RULE_DISPATCHER_NAME;
+    }
+
 }
diff --git a/application/src/main/resources/actor-system.conf b/application/src/main/resources/actor-system.conf
index 248e301..e8e899a 100644
--- a/application/src/main/resources/actor-system.conf
+++ b/application/src/main/resources/actor-system.conf
@@ -92,7 +92,53 @@ core-dispatcher {
   throughput = 5
 }
 
-# This dispatcher is used for rule actors
+# This dispatcher is used for system rule actors
+system-rule-dispatcher {
+  type = Dispatcher
+  executor = "fork-join-executor"
+  fork-join-executor {
+    # Min number of threads to cap factor-based parallelism number to
+    parallelism-min = 2
+    # Max number of threads to cap factor-based parallelism number to
+    parallelism-max = 12
+
+    # The parallelism factor is used to determine thread pool size using the
+    # following formula: ceil(available processors * factor). Resulting size
+    # is then bounded by the parallelism-min and parallelism-max values.
+    parallelism-factor = 0.25
+  }
+  # How long time the dispatcher will wait for new actors until it shuts down
+  shutdown-timeout = 1s
+
+  # Throughput defines the number of messages that are processed in a batch
+  # before the thread is returned to the pool. Set to 1 for as fair as possible.
+  throughput = 5
+}
+
+# This dispatcher is used for system plugin actors
+system-plugin-dispatcher {
+  type = Dispatcher
+  executor = "fork-join-executor"
+  fork-join-executor {
+    # Min number of threads to cap factor-based parallelism number to
+    parallelism-min = 2
+    # Max number of threads to cap factor-based parallelism number to
+    parallelism-max = 12
+
+    # The parallelism factor is used to determine thread pool size using the
+    # following formula: ceil(available processors * factor). Resulting size
+    # is then bounded by the parallelism-min and parallelism-max values.
+    parallelism-factor = 0.25
+  }
+  # How long time the dispatcher will wait for new actors until it shuts down
+  shutdown-timeout = 1s
+
+  # Throughput defines the number of messages that are processed in a batch
+  # before the thread is returned to the pool. Set to 1 for as fair as possible.
+  throughput = 5
+}
+
+# This dispatcher is used for tenant rule actors
 rule-dispatcher {
   type = Dispatcher
   executor = "fork-join-executor"
@@ -115,7 +161,7 @@ rule-dispatcher {
   throughput = 5
 }
 
-# This dispatcher is used for rule actors
+# This dispatcher is used for tenant plugin actors
 plugin-dispatcher {
   type = Dispatcher
   executor = "fork-join-executor"
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
index d25c8db..988410d 100644
--- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
@@ -16,10 +16,7 @@
 package org.thingsboard.server.extensions.api.plugins;
 
 import org.thingsboard.server.common.data.Device;
-import org.thingsboard.server.common.data.id.CustomerId;
-import org.thingsboard.server.common.data.id.DeviceId;
-import org.thingsboard.server.common.data.id.PluginId;
-import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.id.*;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
 import org.thingsboard.server.common.data.kv.TsKvQuery;
@@ -46,6 +43,8 @@ public interface PluginContext {
 
     Optional<PluginApiCallSecurityContext> getSecurityCtx();
 
+    void persistError(String method, Exception e);
+
     /*
         Device RPC API
      */
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/mail/MailPlugin.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/mail/MailPlugin.java
index 24083bc..d0c3490 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/mail/MailPlugin.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/mail/MailPlugin.java
@@ -33,6 +33,9 @@ import org.thingsboard.server.extensions.core.action.mail.SendMailActionMsg;
 import javax.mail.MessagingException;
 import javax.mail.internet.MimeMessage;
 import java.util.Properties;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * @author Andrew Shvayka
@@ -41,6 +44,9 @@ import java.util.Properties;
 @Slf4j
 public class MailPlugin extends AbstractPlugin<MailPluginConfiguration> implements RuleMsgHandler {
 
+    //TODO: Add logic to close this executor on shutdown.
+    private static final ExecutorService executor = Executors.newSingleThreadExecutor();
+
     private MailPluginConfiguration configuration;
     private JavaMailSenderImpl mailSender;
 
@@ -84,12 +90,14 @@ public class MailPlugin extends AbstractPlugin<MailPluginConfiguration> implemen
     @Override
     public void process(PluginContext ctx, TenantId tenantId, RuleId ruleId, RuleToPluginMsg<?> msg) throws RuleException {
         if (msg.getPayload() instanceof SendMailActionMsg) {
-            try {
-                sendMail((SendMailActionMsg) msg.getPayload());
-            } catch (Exception e) {
-                log.warn("Failed to send email", e);
-                throw new RuleException("Failed to send email", e);
-            }
+            executor.submit(() -> {
+                try {
+                    sendMail((SendMailActionMsg) msg.getPayload());
+                } catch (Exception e) {
+                    log.warn("[{}] Failed to send email", ctx.getPluginId(), e);
+                    ctx.persistError("Failed to send email", e);
+                }
+            });
         } else {
             throw new RuntimeException("Not supported msg type: " + msg.getPayload().getClass() + "!");
         }