azkaban-aplcache

Always use multi-executor mode (#1986) * Always use multi-executor

10/18/2018 7:40:06 PM

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 0f2b336..00f0135 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -58,8 +58,7 @@ public class Constants {
   public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
   public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
   public static final String DEFAULT_CONF_PATH = "conf";
-  public static final String AZKABAN_EXECUTOR_PORT_FILENAME = "executor.port";
-  public static final String AZKABAN_EXECUTOR_PORT_FILE = "executor.portfile";
+  public static final String DEFAULT_EXECUTOR_PORT_FILE = "executor.port";
 
   public static final String AZKABAN_SERVLET_CONTEXT_KEY = "azkaban_app";
 
@@ -156,10 +155,8 @@ public class Constants {
 
     // Legacy configs section, new configs should follow the naming convention of azkaban.server.<rest of the name> for server configs.
 
-    // The property is used for the web server to get the host name of the executor when running in SOLO mode.
-    public static final String EXECUTOR_HOST = "executor.host";
-
-    // The property is used for the web server to get the port of the executor when running in SOLO mode.
+    public static final String EXECUTOR_PORT_FILE = "executor.portfile";
+    // To set a fixed port for executor-server. Otherwise some available port is used.
     public static final String EXECUTOR_PORT = "executor.port";
 
     // Max flow running time in mins, server will kill flows running longer than this setting.
diff --git a/azkaban-common/src/main/java/azkaban/executor/ActiveExecutors.java b/azkaban-common/src/main/java/azkaban/executor/ActiveExecutors.java
index 28b8bb0..4b4356d 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ActiveExecutors.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ActiveExecutors.java
@@ -16,8 +16,6 @@
 
 package azkaban.executor;
 
-import azkaban.Constants.ConfigurationKeys;
-import azkaban.utils.Props;
 import com.google.common.collect.ImmutableSet;
 import java.util.Collection;
 import javax.inject.Inject;
@@ -31,12 +29,10 @@ public class ActiveExecutors {
   private static final Logger logger = Logger.getLogger(ExecutorManager.class);
 
   private volatile ImmutableSet<Executor> activeExecutors;
-  private final Props azkProps;
   private final ExecutorLoader executorLoader;
 
   @Inject
-  public ActiveExecutors(final Props azkProps, final ExecutorLoader executorLoader) {
-    this.azkProps = azkProps;
+  public ActiveExecutors(final ExecutorLoader executorLoader) {
     this.executorLoader = executorLoader;
   }
 
@@ -47,13 +43,7 @@ public class ActiveExecutors {
    * fails.
    */
   public void setupExecutors() throws ExecutorManagerException {
-    final ImmutableSet<Executor> newExecutors;
-    if (ExecutorManager.isMultiExecutorMode(this.azkProps)) {
-      newExecutors = setupMultiExecutors();
-    } else {
-      // TODO remove this - switch everything to use multi-executor mode
-      newExecutors = setupSingleExecutor();
-    }
+    final ImmutableSet<Executor> newExecutors = loadExecutors();
     if (newExecutors.isEmpty()) {
       final String error = "No active executors found";
       logger.error(error);
@@ -72,38 +62,9 @@ public class ActiveExecutors {
     return this.activeExecutors;
   }
 
-  private ImmutableSet<Executor> setupMultiExecutors() throws ExecutorManagerException {
-    logger.info("Initializing multi executors from database.");
+  private ImmutableSet<Executor> loadExecutors() throws ExecutorManagerException {
+    logger.info("Initializing executors from database.");
     return ImmutableSet.copyOf(this.executorLoader.fetchActiveExecutors());
   }
 
-  private ImmutableSet<Executor> setupSingleExecutor() throws ExecutorManagerException {
-    if (this.azkProps.containsKey(ConfigurationKeys.EXECUTOR_PORT)) {
-      return getOrAddSingleExecutor();
-    } else {
-      // throw exception when in single executor mode and no executor port specified in azkaban
-      // properties
-      //todo chengren311: convert to slf4j and parameterized logging
-      final String error = "Missing" + ConfigurationKeys.EXECUTOR_PORT + " in azkaban properties.";
-      logger.error(error);
-      throw new ExecutorManagerException(error);
-    }
-  }
-
-  private ImmutableSet<Executor> getOrAddSingleExecutor() throws ExecutorManagerException {
-    // add single executor, if specified as per properties
-    final String executorHost = this.azkProps
-        .getString(ConfigurationKeys.EXECUTOR_HOST, "localhost");
-    final int executorPort = this.azkProps.getInt(ConfigurationKeys.EXECUTOR_PORT);
-    logger.info(String.format("Initializing single executor %s:%d", executorHost, executorPort));
-    Executor executor = this.executorLoader.fetchExecutor(executorHost, executorPort);
-    if (executor == null) {
-      executor = this.executorLoader.addExecutor(executorHost, executorPort);
-    } else if (!executor.isActive()) {
-      executor.setActive(true);
-      this.executorLoader.updateExecutor(executor);
-    }
-    return ImmutableSet.of(new Executor(executor.getId(), executorHost, executorPort, true));
-  }
-
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 3fede00..87e0346 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -108,8 +108,9 @@ public class ExecutorManager extends EventHandler implements
   private List<String> filterList;
   private Map<String, Integer> comparatorWeightsMap;
   private long lastSuccessfulExecutorInfoRefresh;
-  private ExecutorService executorInforRefresherService;
+  private final ExecutorService executorInfoRefresherService;
   private Duration sleepAfterDispatchFailure = Duration.ofSeconds(1L);
+  private boolean initialized = false;
 
   @Inject
   public ExecutorManager(final Props azkProps, final ExecutorLoader executorLoader,
@@ -129,30 +130,39 @@ public class ExecutorManager extends EventHandler implements
     this.updaterStage = updaterStage;
     this.executionFinalizer = executionFinalizer;
     this.updaterThread = updaterThread;
-    this.setupExecutors();
-    this.loadRunningExecutions();
-
-    this.queuedFlows = new QueuedExecutions(
-        azkProps.getLong(Constants.ConfigurationKeys.WEBSERVER_QUEUE_SIZE, 100000));
+    this.maxConcurrentRunsOneFlow = getMaxConcurrentRunsOneFlow(azkProps);
+    this.cleanerThread = createCleanerThread();
+    this.executorInfoRefresherService = createExecutorInfoRefresherService();
+  }
 
+  private int getMaxConcurrentRunsOneFlow(final Props azkProps) {
     // The default threshold is set to 30 for now, in case some users are affected. We may
     // decrease this number in future, to better prevent DDos attacks.
-    this.maxConcurrentRunsOneFlow = azkProps
-        .getInt(Constants.ConfigurationKeys.MAX_CONCURRENT_RUNS_ONEFLOW,
-            DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW);
-    this.loadQueuedFlows();
+    return azkProps.getInt(ConfigurationKeys.MAX_CONCURRENT_RUNS_ONEFLOW,
+        DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW);
+  }
 
-    this.cacheDir = new File(azkProps.getString("cache.directory", "cache"));
+  private CleanerThread createCleanerThread() {
+    final long executionLogsRetentionMs = this.azkProps.getLong("execution.logs.retention.ms",
+        DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+    return new CleanerThread(executionLogsRetentionMs);
+  }
 
-    if (isMultiExecutorMode()) {
-      setupMultiExecutorMode();
+  void initialize() throws ExecutorManagerException {
+    if (this.initialized) {
+      return;
     }
-
-    final long executionLogsRetentionMs =
-        azkProps.getLong("execution.logs.retention.ms",
-            DEFAULT_EXECUTION_LOGS_RETENTION_MS);
-
-    this.cleanerThread = new CleanerThread(executionLogsRetentionMs);
+    this.initialized = true;
+    this.setupExecutors();
+    this.loadRunningExecutions();
+    this.queuedFlows = new QueuedExecutions(
+        this.azkProps.getLong(ConfigurationKeys.WEBSERVER_QUEUE_SIZE, 100000));
+    this.loadQueuedFlows();
+    this.cacheDir = new File(this.azkProps.getString("cache.directory", "cache"));
+    // TODO extract QueueProcessor as a separate class, move all of this into it
+    setupExecutotrComparatorWeightsMap();
+    setupExecutorFilterList();
+    this.queueProcessor = setupQueueProcessor();
   }
 
   // TODO move to some common place
@@ -167,17 +177,11 @@ public class ExecutorManager extends EventHandler implements
     }
   }
 
-  // TODO switch to always use "multi executor mode" - even for single server
-  public static boolean isMultiExecutorMode(final Props props) {
-    return props.getBoolean(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, false);
-  }
-
-  public void start() {
+  public void start() throws ExecutorManagerException {
+    initialize();
     this.updaterThread.start();
     this.cleanerThread.start();
-    if (isMultiExecutorMode()) {
-      this.queueProcessor.start();
-    }
+    this.queueProcessor.start();
   }
 
   private String findApplicationIdFromLog(final String logData) {
@@ -190,39 +194,42 @@ public class ExecutorManager extends EventHandler implements
     return appId;
   }
 
-  private void setupMultiExecutorMode() {
-    // initialize hard filters for executor selector from azkaban.properties
-    final String filters = this.azkProps
-        .getString(Constants.ConfigurationKeys.EXECUTOR_SELECTOR_FILTERS, "");
-    if (filters != null) {
-      this.filterList = Arrays.asList(StringUtils.split(filters, ","));
-    }
+  private QueueProcessorThread setupQueueProcessor() {
+    return new QueueProcessorThread(
+        this.azkProps.getBoolean(Constants.ConfigurationKeys.QUEUEPROCESSING_ENABLED, true),
+        this.azkProps.getLong(Constants.ConfigurationKeys.ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000),
+        this.azkProps.getInt(
+            Constants.ConfigurationKeys.ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5),
+        this.azkProps.getInt(
+            Constants.ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED,
+            this.activeExecutors.getAll().size()),
+        this.sleepAfterDispatchFailure);
+  }
 
+  private void setupExecutotrComparatorWeightsMap() {
     // initialize comparator feature weights for executor selector from azkaban.properties
     final Map<String, String> compListStrings = this.azkProps
-        .getMapByPrefix(Constants.ConfigurationKeys.EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
+        .getMapByPrefix(ConfigurationKeys.EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
     if (compListStrings != null) {
       this.comparatorWeightsMap = new TreeMap<>();
       for (final Map.Entry<String, String> entry : compListStrings.entrySet()) {
         this.comparatorWeightsMap.put(entry.getKey(), Integer.valueOf(entry.getValue()));
       }
     }
+  }
 
-    this.executorInforRefresherService =
-        Executors.newFixedThreadPool(this.azkProps.getInt(
-            Constants.ConfigurationKeys.EXECUTORINFO_REFRESH_MAX_THREADS, 5));
+  private void setupExecutorFilterList() {
+    // initialize hard filters for executor selector from azkaban.properties
+    final String filters = this.azkProps
+        .getString(ConfigurationKeys.EXECUTOR_SELECTOR_FILTERS, "");
+    if (filters != null) {
+      this.filterList = Arrays.asList(StringUtils.split(filters, ","));
+    }
+  }
 
-    // configure queue processor
-    this.queueProcessor =
-        new QueueProcessorThread(
-            this.azkProps.getBoolean(Constants.ConfigurationKeys.QUEUEPROCESSING_ENABLED, true),
-            this.azkProps.getLong(Constants.ConfigurationKeys.ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000),
-            this.azkProps.getInt(
-                Constants.ConfigurationKeys.ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5),
-            this.azkProps.getInt(
-                Constants.ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED,
-                this.activeExecutors.getAll().size()),
-            this.sleepAfterDispatchFailure);
+  private ExecutorService createExecutorInfoRefresherService() {
+    return Executors.newFixedThreadPool(this.azkProps.getInt(
+        ConfigurationKeys.EXECUTORINFO_REFRESH_MAX_THREADS, 5));
   }
 
   /**
@@ -232,11 +239,21 @@ public class ExecutorManager extends EventHandler implements
    */
   @Override
   public void setupExecutors() throws ExecutorManagerException {
+    checkMultiExecutorMode();
     this.activeExecutors.setupExecutors();
   }
 
-  private boolean isMultiExecutorMode() {
-    return isMultiExecutorMode(this.azkProps);
+  // TODO Enforced for now to ensure that users migrate to multi-executor mode acknowledgingly.
+  // TODO Remove this once confident enough that all active users have already updated to some
+  // version new enough to have this change - for example after 1 year has passed.
+  // TODO Then also delete ConfigurationKeys.USE_MULTIPLE_EXECUTORS.
+  @Deprecated
+  private void checkMultiExecutorMode() {
+    if (!this.azkProps.getBoolean(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, false)) {
+      throw new IllegalArgumentException(
+          Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS +
+              " must be true. Single executor mode is not supported any more.");
+    }
   }
 
   /**
@@ -249,7 +266,7 @@ public class ExecutorManager extends EventHandler implements
     for (final Executor executor : this.activeExecutors.getAll()) {
       // execute each executorInfo refresh task to fetch
       final Future<ExecutorInfo> fetchExecutionInfo =
-          this.executorInforRefresherService.submit(
+          this.executorInfoRefresherService.submit(
               () -> this.apiGateway.callForJsonType(executor.getHost(),
                   executor.getPort(), "/serverStatistics", null, ExecutorInfo.class));
       futures.add(new Pair<>(executor,
@@ -286,41 +303,23 @@ public class ExecutorManager extends EventHandler implements
   }
 
   /**
-   * Throws exception if running in local mode {@inheritDoc}
-   *
    * @see azkaban.executor.ExecutorManagerAdapter#disableQueueProcessorThread()
    */
   @Override
-  public void disableQueueProcessorThread() throws ExecutorManagerException {
-    if (isMultiExecutorMode()) {
-      this.queueProcessor.setActive(false);
-    } else {
-      throw new ExecutorManagerException(
-          "Cannot disable QueueProcessor in local mode");
-    }
+  public void disableQueueProcessorThread() {
+    this.queueProcessor.setActive(false);
   }
 
   /**
-   * Throws exception if running in local mode {@inheritDoc}
-   *
    * @see azkaban.executor.ExecutorManagerAdapter#enableQueueProcessorThread()
    */
   @Override
-  public void enableQueueProcessorThread() throws ExecutorManagerException {
-    if (isMultiExecutorMode()) {
-      this.queueProcessor.setActive(true);
-    } else {
-      throw new ExecutorManagerException(
-          "Cannot enable QueueProcessor in local mode");
-    }
+  public void enableQueueProcessorThread() {
+    this.queueProcessor.setActive(true);
   }
 
   public State getQueueProcessorThreadState() {
-    if (isMultiExecutorMode()) {
-      return this.queueProcessor.getState();
-    } else {
-      return State.NEW; // not started in local mode
-    }
+    return this.queueProcessor.getState();
   }
 
   /**
@@ -328,11 +327,7 @@ public class ExecutorManager extends EventHandler implements
    * dispatched as expected
    */
   public boolean isQueueProcessorThreadActive() {
-    if (isMultiExecutorMode()) {
-      return this.queueProcessor.isActive();
-    } else {
-      return false;
-    }
+    return this.queueProcessor.isActive();
   }
 
   /**
@@ -1113,30 +1108,9 @@ public class ExecutorManager extends EventHandler implements
         final ExecutionReference reference =
             new ExecutionReference(exflow.getExecutionId());
 
-        if (isMultiExecutorMode()) {
-          //Take MultiExecutor route
-          this.executorLoader.addActiveExecutableReference(reference);
-          this.queuedFlows.enqueue(exflow, reference);
-        } else {
-          // assign only local executor we have
-          final Executor choosenExecutor = this.activeExecutors.getAll().iterator().next();
-          this.executorLoader.addActiveExecutableReference(reference);
-          try {
-            dispatch(reference, exflow, choosenExecutor);
-            this.commonMetrics.markDispatchSuccess();
-          } catch (final ExecutorManagerException e) {
-            // When flow dispatch fails, should update the flow status
-            // to FAILED in execution_flows DB table as well. Currently
-            // this logic is only implemented in multiExecutorMode but
-            // missed in single executor case.
-            this.commonMetrics.markDispatchFail();
-            this.executionFinalizer.finalizeFlow(exflow, "Dispatching failed", e);
-            throw e;
-          }
-        }
-        message +=
-            "Execution submitted successfully with exec id "
-                + exflow.getExecutionId();
+        this.executorLoader.addActiveExecutableReference(reference);
+        this.queuedFlows.enqueue(exflow, reference);
+        message += "Execution queued successfully with exec id " + exflow.getExecutionId();
       }
       return message;
     }
@@ -1199,9 +1173,7 @@ public class ExecutorManager extends EventHandler implements
 
   @Override
   public void shutdown() {
-    if (isMultiExecutorMode()) {
-      this.queueProcessor.shutdown();
-    }
+    this.queueProcessor.shutdown();
     this.updaterThread.shutdown();
   }
 
@@ -1478,7 +1450,7 @@ public class ExecutorManager extends EventHandler implements
             + "reached " + ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED
             + " (tried " + reference.getNumErrors() + " executors)";
         ExecutorManager.logger.error(message);
-        executionFinalizer.finalizeFlow(exflow, message, lastError);
+        ExecutorManager.this.executionFinalizer.finalizeFlow(exflow, message, lastError);
       }
     }
 
@@ -1486,7 +1458,7 @@ public class ExecutorManager extends EventHandler implements
         final Executor selectedExecutor) {
       remainingExecutors.remove(selectedExecutor);
       if (remainingExecutors.isEmpty()) {
-        remainingExecutors.addAll(activeExecutors.getAll());
+        remainingExecutors.addAll(ExecutorManager.this.activeExecutors.getAll());
         sleepAfterDispatchFailure();
       }
     }
@@ -1579,7 +1551,7 @@ public class ExecutorManager extends EventHandler implements
   }
 
   @VisibleForTesting
-  void setSleepAfterDispatchFailure(Duration sleepAfterDispatchFailure) {
+  void setSleepAfterDispatchFailure(final Duration sleepAfterDispatchFailure) {
     this.sleepAfterDispatchFailure = sleepAfterDispatchFailure;
   }
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 48017c6..81e1419 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -17,6 +17,7 @@
 package azkaban.executor;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
 import static org.mockito.ArgumentMatchers.contains;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.any;
@@ -26,6 +27,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import azkaban.Constants;
+import azkaban.Constants.ConfigurationKeys;
 import azkaban.alert.Alerter;
 import azkaban.metrics.CommonMetrics;
 import azkaban.metrics.MetricsManager;
@@ -113,21 +115,15 @@ public class ExecutorManagerTest {
   }
 
   /*
-   * Test backward compatibility with just local executor
+   * Test error message with unsupported local executor conf
    */
   @Test
-  public void testLocalExecutorScenario() throws Exception {
-    this.props.put("executor.port", 12345);
-    final ExecutorManager manager = createExecutorManager();
-    final Set<Executor> activeExecutors =
-        new HashSet(manager.getAllActiveExecutors());
-
-    Assert.assertEquals(activeExecutors.size(), 1);
-    final Executor executor = activeExecutors.iterator().next();
-    Assert.assertEquals(executor.getHost(), "localhost");
-    Assert.assertEquals(executor.getPort(), 12345);
-    Assert.assertArrayEquals(activeExecutors.toArray(), this.loader
-        .fetchActiveExecutors().toArray());
+  public void testLocalExecutorScenario() {
+    this.props.put(ConfigurationKeys.EXECUTOR_PORT, 12345);
+    final Throwable thrown = catchThrowable(() -> createExecutorManager());
+    assertThat(thrown).isInstanceOf(IllegalArgumentException.class);
+    assertThat(thrown.getMessage()).isEqualTo(
+        "azkaban.use.multiple.executors must be true. Single executor mode is not supported any more.");
   }
 
   /*
@@ -149,7 +145,7 @@ public class ExecutorManagerTest {
   private ExecutorManager createExecutorManager()
       throws ExecutorManagerException {
     // TODO rename this test to ExecutorManagerIntegrationTest & create separate unit tests as well?
-    final ActiveExecutors activeExecutors = new ActiveExecutors(this.props, this.loader);
+    final ActiveExecutors activeExecutors = new ActiveExecutors(this.loader);
     final ExecutionFinalizer executionFinalizer = new ExecutionFinalizer(this.loader,
         this.updaterStage, this.alertHolder, this.runningExecutions);
     final RunningExecutionsUpdaterThread updaterThread = new RunningExecutionsUpdaterThread(
@@ -162,6 +158,7 @@ public class ExecutorManagerTest {
         this.commonMetrics, this.apiGateway, this.runningExecutions, activeExecutors,
         this.updaterStage, executionFinalizer, updaterThread);
     executorManager.setSleepAfterDispatchFailure(Duration.ZERO);
+    executorManager.initialize();
     return executorManager;
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
index c8ac487..6021af2 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
@@ -18,6 +18,7 @@ package azkaban.trigger;
 
 import static org.mockito.Mockito.mock;
 
+import azkaban.Constants.ConfigurationKeys;
 import azkaban.executor.ActiveExecutors;
 import azkaban.executor.AlerterHolder;
 import azkaban.executor.ExecutionFinalizer;
@@ -61,7 +62,7 @@ public class TriggerManagerDeadlockTest {
     this.loader = new MockTriggerLoader();
     final Props props = new Props();
     props.put("trigger.scan.interval", 1000);
-    props.put("executor.port", 12321);
+    props.put(ConfigurationKeys.EXECUTOR_PORT, 12321);
     this.execLoader = new MockExecutorLoader();
     this.apiGateway = mock(ExecutorApiGateway.class);
     this.runningExecutions = new RunningExecutions();
@@ -75,7 +76,7 @@ public class TriggerManagerDeadlockTest {
   }
 
   private ExecutorManager getExecutorManager(final Props props) throws ExecutorManagerException {
-    final ActiveExecutors activeExecutors = new ActiveExecutors(props, this.execLoader);
+    final ActiveExecutors activeExecutors = new ActiveExecutors(this.execLoader);
     final RunningExecutionsUpdaterThread updaterThread = getRunningExecutionsUpdaterThread();
     return new ExecutorManager(props, this.execLoader, this.commonMetrics, this.apiGateway,
         this.runningExecutions, activeExecutors, this.updaterStage, this.executionFinalizer,
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 99031f2..195cb52 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -16,7 +16,7 @@
 
 package azkaban.execapp;
 
-import static azkaban.Constants.AZKABAN_EXECUTOR_PORT_FILENAME;
+import static azkaban.Constants.DEFAULT_EXECUTOR_PORT_FILE;
 import static azkaban.Constants.ConfigurationKeys;
 import static azkaban.ServiceProvider.SERVICE_PROVIDER;
 import static azkaban.execapp.ExecJettyServerModule.EXEC_JETTY_SERVER;
@@ -289,7 +289,7 @@ public class AzkabanExecutorServer {
   private void dumpPortToFile() throws IOException {
     // By default this should write to the working directory
     final String portFileName = this.props
-        .getString(Constants.AZKABAN_EXECUTOR_PORT_FILE, AZKABAN_EXECUTOR_PORT_FILENAME);
+        .getString(ConfigurationKeys.EXECUTOR_PORT_FILE, DEFAULT_EXECUTOR_PORT_FILE);
     FileIOUtils.dumpNumberToFile(Paths.get(portFileName), getPort());
   }
 
@@ -468,7 +468,7 @@ public class AzkabanExecutorServer {
    * @return hostname
    */
   public String getHost() {
-    if (this.props.containsKey(Constants.ConfigurationKeys.AZKABAN_SERVER_HOST_NAME)) {
+    if (this.props.containsKey(ConfigurationKeys.AZKABAN_SERVER_HOST_NAME)) {
       final String hostName = this.props
           .getString(Constants.ConfigurationKeys.AZKABAN_SERVER_HOST_NAME);
       if (!StringUtils.isEmpty(hostName)) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecJettyServerModule.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecJettyServerModule.java
index f449281..3bd2b63 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecJettyServerModule.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecJettyServerModule.java
@@ -1,5 +1,6 @@
 package azkaban.execapp;
 
+import azkaban.Constants.ConfigurationKeys;
 import azkaban.utils.Props;
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
@@ -38,7 +39,7 @@ public class ExecJettyServerModule extends AbstractModule {
      * The Jetty server automatically finds an unused port when the port number is set to zero
      * TODO: This is using a highly outdated version of jetty [year 2010]. needs to be updated.
      */
-    final Server server = new Server(props.getInt("executor.port", 0));
+    final Server server = new Server(props.getInt(ConfigurationKeys.EXECUTOR_PORT, 0));
     final QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
     server.setThreadPool(httpThreadPool);
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/AzkabanExecutorServerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/AzkabanExecutorServerTest.java
index 2e50b15..35b01ae 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/AzkabanExecutorServerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/AzkabanExecutorServerTest.java
@@ -75,7 +75,7 @@ public class AzkabanExecutorServerTest {
   public static void tearDown() throws Exception {
     deleteQuietly(new File("h2.mv.db"));
     deleteQuietly(new File("h2.trace.db"));
-    deleteQuietly(new File("executor.port"));
+    deleteQuietly(new File(Constants.DEFAULT_EXECUTOR_PORT_FILE));
     deleteQuietly(new File("executions"));
     deleteQuietly(new File("projects"));
   }
diff --git a/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java b/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java
index 2620247..cc2903c 100644
--- a/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java
+++ b/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
 
 import azkaban.AzkabanCommonModule;
 import azkaban.Constants;
+import azkaban.Constants.ConfigurationKeys;
 import azkaban.database.AzkabanDatabaseSetup;
 import azkaban.database.AzkabanDatabaseUpdater;
 import azkaban.execapp.AzkabanExecServerModule;
@@ -62,10 +63,10 @@ public class AzkabanSingleServerTest {
   }
 
   @AfterClass
-  public static void tearDown() throws Exception {
+  public static void tearDown() {
     deleteQuietly(new File("h2.mv.db"));
     deleteQuietly(new File("h2.trace.db"));
-    deleteQuietly(new File("executor.port"));
+    deleteQuietly(new File(Constants.DEFAULT_EXECUTOR_PORT_FILE));
     deleteQuietly(new File("executions"));
     deleteQuietly(new File("projects"));
   }
@@ -79,13 +80,13 @@ public class AzkabanSingleServerTest {
     props.put("database.type", "h2");
     props.put("h2.path", "./h2");
 
-    props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "false");
+    props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "true");
     props.put("server.port", "0");
     props.put("jetty.port", "0");
     props.put("server.useSSL", "true");
     props.put("jetty.use.ssl", "false");
     props.put("user.manager.xml.file", new File(confPath, "azkaban-users.xml").getPath());
-    props.put("executor.port", "12321");
+    props.put(ConfigurationKeys.EXECUTOR_PORT, "12321");
 
     // Quartz settings
     props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
@@ -98,13 +99,13 @@ public class AzkabanSingleServerTest {
   }
 
   @Test
-  public void testInjection() throws Exception {
+  public void testInjection() {
     SERVICE_PROVIDER.unsetInjector();
     /* Initialize Guice Injector */
     final Injector injector = Guice.createInjector(
         new AzkabanCommonModule(props),
-        new AzkabanWebServerModule(),
-        new AzkabanExecServerModule()
+        new AzkabanExecServerModule(),
+        new AzkabanWebServerModule()
     );
     SERVICE_PROVIDER.setInjector(injector);
 
diff --git a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
index cb69183..7a11e4e 100644
--- a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
@@ -111,7 +111,7 @@ public class AzkabanWebServerTest {
 
     deleteQuietly(new File("h2.mv.db"));
     deleteQuietly(new File("h2.trace.db"));
-    deleteQuietly(new File("executor.port"));
+    deleteQuietly(new File(Constants.DEFAULT_EXECUTOR_PORT_FILE));
     deleteQuietly(new File("executions"));
     deleteQuietly(new File("projects"));
   }
diff --git a/docs/configuration.rst b/docs/configuration.rst
index b6d0b10..7277750 100644
--- a/docs/configuration.rst
+++ b/docs/configuration.rst
@@ -216,13 +216,6 @@ Executor Manager Properties
 +-----------------------+-----------------------+-----------------------+
 | Parameter             | Description           | Default               |
 +=======================+=======================+=======================+
-| executor.port         | The port for the      | 12321                 |
-|                       | azkaban executor      |                       |
-|                       | server                |                       |
-+-----------------------+-----------------------+-----------------------+
-| executor.host         | The host for azkaban  | localhost             |
-|                       | executor server       |                       |
-+-----------------------+-----------------------+-----------------------+
 | execution.logs.retent | Time in milliseconds  | 7257600000L (12       |
 | ion.ms                | that execution logs   | weeks)                |
 |                       | are retained          |                       |
@@ -288,7 +281,7 @@ Executor Server Properties
 +-----------------------+-----------------------+-----------------------+
 | Parameter             | Description           | Default               |
 +=======================+=======================+=======================+
-|   executor.port       | The port for azkaban  | 12321                 |
+|   executor.port       | The port for azkaban  | 0 (any free port)     |
 |                       | executor server       |                       |
 +-----------------------+-----------------------+-----------------------+
 |   executor.global.pro | A path to the         |   none                |