azkaban-aplcache

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
index 04a3434..9e8d441 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Executor.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -97,9 +97,9 @@ public class Executor implements Comparable<Executor> {
 
   @Override
   public String toString() {
-    return String.format("%s:%s (id: %s)",
+    return String.format("%s:%s (id: %s), active=%s",
         null == this.host || this.host.length() == 0 ? "(empty)" : this.host,
-        this.port, this.id);
+        this.port, this.id, this.isActive);
   }
 
   public String getHost() {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index d583d59..ad0c167 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -277,7 +277,10 @@ public class AzkabanExecutorServer {
       checkState(port != -1);
       final Executor executor = this.executionLoader.fetchExecutor(host, port);
       if (executor == null) {
+        logger.info("This executor wasn't found in the DB. Adding self.");
         this.executionLoader.addExecutor(host, port);
+      } else {
+        logger.info("This executor is already in the DB. Found: " + executor);
       }
       // If executor already exists, ignore it
     } catch (final ExecutorManagerException e) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
index f0b77cc..29ec76a 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
@@ -27,7 +27,6 @@ import azkaban.executor.ExecutorManagerException;
 import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.JSONUtils;
-import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
@@ -372,18 +371,8 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 
   private void setActiveInternal(final boolean value)
       throws ExecutorManagerException {
-    final ExecutorLoader executorLoader = this.application.getExecutorLoader();
-    final Executor executor = executorLoader.fetchExecutor(this.application.getHost(),
-        this.application.getPort());
-    Preconditions.checkState(executor != null, "Unable to obtain self entry in DB");
-    if (executor.isActive() != value) {
-      executor.setActive(value);
-      executorLoader.updateExecutor(executor);
-      this.flowRunnerManager.setExecutorActive(value);
-    } else {
-      logger.warn(
-          "Set active action ignored. Executor is already " + (value ? "active" : "inactive"));
-    }
+    this.flowRunnerManager.setExecutorActive(value,
+        this.application.getHost(), this.application.getPort());
   }
 
   /**
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 3cb2311..32e475b 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -26,6 +26,7 @@ import azkaban.execapp.event.RemoteFlowWatcher;
 import azkaban.execapp.metric.NumFailedFlowMetric;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
+import azkaban.executor.Executor;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.Status;
@@ -48,6 +49,7 @@ import azkaban.utils.Props;
 import azkaban.utils.ThreadPoolExecutingListener;
 import azkaban.utils.TrackingThreadPool;
 import azkaban.utils.UndefinedPropertyException;
+import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -320,13 +322,20 @@ public class FlowRunnerManager implements EventListener,
     return allProjects;
   }
 
-  // todo chengren311: this method will be invoked by executor activate API, but in SOLO mode
-  // the API is not called. So we should either have everything run in "multi-executor" mode
-  // or make SOLO server mode call the API.
-  public void setExecutorActive(final boolean isActive) {
-    this.isExecutorActive = isActive;
-    if (this.isExecutorActive) {
-      this.installedProjects = this.loadExistingProjectsAsCache();
+  public void setExecutorActive(final boolean isActive, final String host, final int port)
+      throws ExecutorManagerException {
+    final Executor executor = this.executorLoader.fetchExecutor(host, port);
+    Preconditions.checkState(executor != null, "Unable to obtain self entry in DB");
+    if (executor.isActive() != isActive) {
+      executor.setActive(isActive);
+      this.executorLoader.updateExecutor(executor);
+      this.isExecutorActive = isActive;
+      if (this.isExecutorActive) {
+        this.installedProjects = this.loadExistingProjectsAsCache();
+      }
+    } else {
+      logger.info(
+          "Set active action ignored. Executor is already " + (isActive ? "active" : "inactive"));
     }
   }
 
diff --git a/azkaban-solo-server/src/main/java/azkaban/soloserver/AzkabanSingleServer.java b/azkaban-solo-server/src/main/java/azkaban/soloserver/AzkabanSingleServer.java
index fab9267..124fe51 100644
--- a/azkaban-solo-server/src/main/java/azkaban/soloserver/AzkabanSingleServer.java
+++ b/azkaban-solo-server/src/main/java/azkaban/soloserver/AzkabanSingleServer.java
@@ -124,6 +124,10 @@ public class AzkabanSingleServer {
     AzkabanExecutorServer.launch(this.executor);
     log.info("Azkaban Exec Server started...");
 
+    this.executor.getFlowRunnerManager()
+        .setExecutorActive(true, this.executor.getHost(), this.executor.getPort());
+    log.info("Azkaban Exec Server activated...");
+
     AzkabanWebServer.launch(this.webServer);
     log.info("Azkaban Web Server started...");
   }
diff --git a/azkaban-solo-server/src/main/resources/conf/azkaban.properties b/azkaban-solo-server/src/main/resources/conf/azkaban.properties
index 624c6c7..9fe1c3d 100644
--- a/azkaban-solo-server/src/main/resources/conf/azkaban.properties
+++ b/azkaban-solo-server/src/main/resources/conf/azkaban.properties
@@ -43,3 +43,4 @@ executor.connector.stats=true
 azkaban.jobtype.plugin.dir=plugins/jobtypes
 # Number of executions to be displayed
 azkaban.display.execution_page_size=16
+azkaban.use.multiple.executors=true