azkaban-developers

AZNewDispatchingLogic - Executor failure detection (#2098) *

1/22/2019 5:21:44 PM
3.68.0

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index afcea5a..4c4bca5 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -94,6 +94,11 @@ public class Constants {
     public static final String AZKABAN_POLL_MODEL = "azkaban.poll.model";
     public static final String AZKABAN_POLLING_INTERVAL_MS = "azkaban.polling.interval.ms";
 
+    // Configures properties for Azkaban executor health check
+    public static final String AZKABAN_EXECUTOR_HEALTHCHECK_INTERVAL_MIN = "azkaban.executor.healthcheck.interval.min";
+    public static final String AZKABAN_EXECUTOR_MAX_FAILURE_COUNT = "azkaban.executor.max.failurecount";
+    public static final String AZKABAN_ADMIN_ALERT_EMAIL = "azkaban.admin.alert.email";
+
     // Configures Azkaban Flow Version in project YAML file
     public static final String AZKABAN_FLOW_VERSION = "azkaban-flow-version";
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
index 0de0713..20e4f15 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
@@ -55,14 +55,17 @@ public class ExecutionController extends EventHandler implements ExecutorManager
   private final ExecutorLoader executorLoader;
   private final ExecutorApiGateway apiGateway;
   private final AlerterHolder alerterHolder;
+  private final ExecutorHealthChecker executorHealthChecker;
   private final int maxConcurrentRunsOneFlow;
 
   @Inject
   ExecutionController(final Props azkProps, final ExecutorLoader executorLoader,
-      final ExecutorApiGateway apiGateway, final AlerterHolder alerterHolder) {
+      final ExecutorApiGateway apiGateway, final AlerterHolder alerterHolder, final
+  ExecutorHealthChecker executorHealthChecker) {
     this.executorLoader = executorLoader;
     this.apiGateway = apiGateway;
     this.alerterHolder = alerterHolder;
+    this.executorHealthChecker = executorHealthChecker;
     this.maxConcurrentRunsOneFlow = getMaxConcurrentRunsOneFlow(azkProps);
   }
 
@@ -112,7 +115,7 @@ public class ExecutionController extends EventHandler implements ExecutorManager
     try {
       executors = this.executorLoader.fetchActiveExecutors();
     } catch (final ExecutorManagerException e) {
-      this.logger.error("Failed to get all active executors.", e);
+      logger.error("Failed to get all active executors.", e);
     }
     return executors;
   }
@@ -130,7 +133,7 @@ public class ExecutionController extends EventHandler implements ExecutorManager
         ports.add(executor.getHost() + ":" + executor.getPort());
       }
     } catch (final ExecutorManagerException e) {
-      this.logger.error("Failed to get primary server hosts.", e);
+      logger.error("Failed to get primary server hosts.", e);
     }
     return ports;
   }
@@ -149,7 +152,7 @@ public class ExecutionController extends EventHandler implements ExecutorManager
         }
       }
     } catch (final ExecutorManagerException e) {
-      this.logger.error("Failed to get all active executor server hosts.", e);
+      logger.error("Failed to get all active executor server hosts.", e);
     }
     return ports;
   }
@@ -167,7 +170,7 @@ public class ExecutionController extends EventHandler implements ExecutorManager
       executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
           this.executorLoader.fetchUnfinishedFlows().values()));
     } catch (final ExecutorManagerException e) {
-      this.logger.error("Failed to get running flows for project " + projectId + ", flow "
+      logger.error("Failed to get running flows for project " + projectId + ", flow "
           + flowId, e);
     }
     return executionIds;
@@ -193,7 +196,7 @@ public class ExecutionController extends EventHandler implements ExecutorManager
     try {
       getActiveFlowsWithExecutorHelper(flows, this.executorLoader.fetchUnfinishedFlows().values());
     } catch (final ExecutorManagerException e) {
-      this.logger.error("Failed to get active flows with executor.", e);
+      logger.error("Failed to get active flows with executor.", e);
     }
     return flows;
   }
@@ -220,7 +223,7 @@ public class ExecutionController extends EventHandler implements ExecutorManager
           this.executorLoader.fetchUnfinishedFlows().values());
 
     } catch (final ExecutorManagerException e) {
-      this.logger.error(
+      logger.error(
           "Failed to check if the flow is running for project " + projectId + ", flow " + flowId,
           e);
     }
@@ -257,7 +260,7 @@ public class ExecutionController extends EventHandler implements ExecutorManager
     try {
       getFlowsHelper(flows, this.executorLoader.fetchUnfinishedFlows().values());
     } catch (final ExecutorManagerException e) {
-      this.logger.error("Failed to get running flows.", e);
+      logger.error("Failed to get running flows.", e);
     }
     return flows;
   }
@@ -669,10 +672,14 @@ public class ExecutionController extends EventHandler implements ExecutorManager
         Integer.valueOf(hostPortSplit[1]), "/jmx", paramList);
   }
 
+  @Override
+  public void start() {
+    this.executorHealthChecker.start();
+  }
 
   @Override
   public void shutdown() {
-    //Todo: shutdown any thread that is running
+    this.executorHealthChecker.shutdown();
   }
 
   @Override
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorHealthChecker.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorHealthChecker.java
new file mode 100644
index 0000000..ac774e0
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorHealthChecker.java
@@ -0,0 +1,180 @@
+/*
+* Copyright 2019 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+package azkaban.executor;
+
+import azkaban.Constants.ConfigurationKeys;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Periodically checks the health of executors. Finalizes flows or sends alert emails when needed.
+ */
+@SuppressWarnings("FutureReturnValueIgnored")
+@Singleton
+public class ExecutorHealthChecker {
+
+  private static final Logger logger = LoggerFactory.getLogger(ExecutorHealthChecker.class);
+  // Max number of executor failures before sending out alert emails.
+  private static final int DEFAULT_EXECUTOR_MAX_FAILURE_COUNT = 6;
+  // Web server checks executor health every 5 min by default.
+  private static final Duration DEFAULT_EXECUTOR_HEALTHCHECK_INTERVAL = Duration.ofMinutes(5);
+  private final long healthCheckIntervalMin;
+  private final int executorMaxFailureCount;
+  private final List<String> alertEmails;
+  private final ScheduledExecutorService scheduler;
+  private final ExecutorLoader executorLoader;
+  private final ExecutorApiGateway apiGateway;
+  private final AlerterHolder alerterHolder;
+  private final Map<Integer, Integer> executorFailureCount = new HashMap<>();
+
+  @Inject
+  public ExecutorHealthChecker(final Props azkProps, final ExecutorLoader executorLoader,
+      final ExecutorApiGateway apiGateway, final AlerterHolder alerterHolder) {
+    this.healthCheckIntervalMin = azkProps
+        .getLong(ConfigurationKeys.AZKABAN_EXECUTOR_HEALTHCHECK_INTERVAL_MIN,
+            DEFAULT_EXECUTOR_HEALTHCHECK_INTERVAL.toMinutes());
+    this.executorMaxFailureCount = azkProps.getInt(ConfigurationKeys
+        .AZKABAN_EXECUTOR_MAX_FAILURE_COUNT, DEFAULT_EXECUTOR_MAX_FAILURE_COUNT);
+    this.alertEmails = azkProps.getStringList(ConfigurationKeys.AZKABAN_ADMIN_ALERT_EMAIL);
+    this.scheduler = Executors.newSingleThreadScheduledExecutor();
+    this.executorLoader = executorLoader;
+    this.apiGateway = apiGateway;
+    this.alerterHolder = alerterHolder;
+  }
+
+  public void start() {
+    logger.info("Starting executor health checker.");
+    this.scheduler.scheduleAtFixedRate(() -> checkExecutorHealth(), 0L, this.healthCheckIntervalMin,
+        TimeUnit.MINUTES);
+  }
+
+  public void shutdown() {
+    this.scheduler.shutdown();
+    try {
+      if (!this.scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
+        this.scheduler.shutdownNow();
+      }
+    } catch (final InterruptedException ex) {
+      this.scheduler.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  /**
+   * Checks executor health. Finalizes the flow if its executor is already removed from DB or
+   * sends alert emails if the executor isn't alive any more.
+   */
+  public void checkExecutorHealth() {
+    final Map<Optional<Executor>, List<ExecutableFlow>> exFlowMap = getFlowToExecutorMap();
+    for (final Map.Entry<Optional<Executor>, List<ExecutableFlow>> entry : exFlowMap.entrySet()) {
+      final Optional<Executor> executorOption = entry.getKey();
+      if (!executorOption.isPresent()) {
+        final String finalizeReason = "Executor id of this execution doesn't exist.";
+        for (final ExecutableFlow flow : entry.getValue()) {
+          logger.warn(
+              String.format("Finalizing execution %s, %s", flow.getExecutionId(), finalizeReason));
+          ExecutionControllerUtils
+              .finalizeFlow(this.executorLoader, this.alerterHolder, flow, finalizeReason, null);
+        }
+        continue;
+      }
+
+      final Executor executor = executorOption.get();
+      try {
+        // Todo jamiesjc: add metrics to monitor the http call return time
+        final Map<String, Object> results = this.apiGateway
+            .callWithExecutionId(executor.getHost(), executor.getPort(),
+                ConnectorParams.PING_ACTION, null, null);
+        if (results == null || results.containsKey(ConnectorParams.RESPONSE_ERROR) || !results
+            .containsKey(ConnectorParams.STATUS_PARAM) || !results.get(ConnectorParams.STATUS_PARAM)
+            .equals(ConnectorParams.RESPONSE_ALIVE)) {
+          throw new ExecutorManagerException("Status of executor " + executor.getId() + " is "
+              + "not alive.");
+        } else {
+          // Executor is alive. Clear the failure count.
+          if (this.executorFailureCount.containsKey(executor.getId())) {
+            this.executorFailureCount.put(executor.getId(), 0);
+          }
+        }
+      } catch (final ExecutorManagerException e) {
+        handleExecutorNotAliveCase(entry, executor, e);
+      }
+    }
+  }
+
+  /**
+   * Groups Executable flow by Executors to reduce number of REST calls.
+   *
+   * @return executor to list of flows map
+   */
+  private Map<Optional<Executor>, List<ExecutableFlow>> getFlowToExecutorMap() {
+    final HashMap<Optional<Executor>, List<ExecutableFlow>> exFlowMap = new HashMap<>();
+    try {
+      for (final Pair<ExecutionReference, ExecutableFlow> runningFlow : this
+          .executorLoader.fetchActiveFlows().values()) {
+        final Optional<Executor> executor = runningFlow.getFirst().getExecutor();
+        List<ExecutableFlow> flows = exFlowMap.get(executor);
+        if (flows == null) {
+          flows = new ArrayList<>();
+          exFlowMap.put(executor, flows);
+        }
+        flows.add(runningFlow.getSecond());
+      }
+    } catch (final ExecutorManagerException e) {
+      logger.error("Failed to get flow to executor map");
+    }
+    return exFlowMap;
+  }
+
+  /**
+   * Increments executor failure count. If it reaches max failure count, sends alert emails to AZ
+   * admin.
+   *
+   * @param entry executor to list of flows map entry
+   * @param executor the executor
+   * @param e Exception thrown when the executor is not alive
+   */
+  private void handleExecutorNotAliveCase(
+      final Entry<Optional<Executor>, List<ExecutableFlow>> entry, final Executor executor,
+      final ExecutorManagerException e) {
+    logger.error("Failed to get update from executor " + executor.getId(), e);
+    this.executorFailureCount.put(executor.getId(), this.executorFailureCount.getOrDefault
+        (executor.getId(), 0) + 1);
+    if (this.executorFailureCount.get(executor.getId()) % this.executorMaxFailureCount == 0
+        && !this.alertEmails.isEmpty()) {
+      entry.getValue().stream().forEach(flow -> flow
+          .getExecutionOptions().setFailureEmails(this.alertEmails));
+      logger.info(String.format("Executor failure count is %d. Sending alert emails to %s.",
+          this.executorFailureCount.get(executor.getId()), this.alertEmails));
+      this.alerterHolder.get("email").alertOnFailedUpdate(executor, entry.getValue(), e);
+    }
+  }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 0125778..f852732 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -164,6 +164,7 @@ public class ExecutorManager extends EventHandler implements
     this.queueProcessor = setupQueueProcessor();
   }
 
+  @Override
   public void start() throws ExecutorManagerException {
     initialize();
     this.updaterThread.start();
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
index 907ab1c..e229153 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -116,6 +116,8 @@ public interface ExecutorManagerAdapter {
   public Map<String, Object> callExecutorJMX(String hostPort, String action,
       String mBean) throws IOException;
 
+  public void start() throws ExecutorManagerException;
+
   public void shutdown();
 
   public Set<String> getAllActiveExecutorServerHosts();
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
index 9b8ab3c..209a75d 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
@@ -34,7 +34,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -51,6 +50,7 @@ public class ExecutionControllerTest {
   private ExecutorLoader loader;
   private ExecutorApiGateway apiGateway;
   private AlerterHolder alertHolder;
+  private ExecutorHealthChecker executorHealthChecker;
   private Props props;
   private User user;
   private ExecutableFlow flow1;
@@ -69,8 +69,9 @@ public class ExecutionControllerTest {
     this.apiGateway = mock(ExecutorApiGateway.class);
     this.props.put(Constants.ConfigurationKeys.MAX_CONCURRENT_RUNS_ONEFLOW, 1);
     this.alertHolder = mock(AlerterHolder.class);
+    this.executorHealthChecker = mock(ExecutorHealthChecker.class);
     this.controller = new ExecutionController(this.props, this.loader, this.apiGateway,
-        this.alertHolder);
+        this.alertHolder, this.executorHealthChecker);
 
     final Executor executor1 = new Executor(1, "localhost", 12345, true);
     final Executor executor2 = new Executor(2, "localhost", 12346, true);
@@ -99,13 +100,6 @@ public class ExecutionControllerTest {
     when(this.loader.fetchQueuedFlows()).thenReturn(this.queuedFlows);
   }
 
-  @After
-  public void tearDown() {
-    if (this.controller != null) {
-      this.controller.shutdown();
-    }
-  }
-
   @Test
   public void testFetchAllActiveFlows() throws Exception {
     initializeUnfinishedFlows();
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorHealthCheckerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorHealthCheckerTest.java
new file mode 100644
index 0000000..ef29bd2
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorHealthCheckerTest.java
@@ -0,0 +1,136 @@
+/*
+* Copyright 2019 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+package azkaban.executor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import azkaban.Constants.ConfigurationKeys;
+import azkaban.alert.Alerter;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test case for executor health checker.
+ */
+public class ExecutorHealthCheckerTest {
+
+  private static final int EXECUTION_ID_11 = 11;
+  private static final String AZ_ADMIN_ALERT_EMAIL = "az_admin1@foo.com,az_admin2@foo.com";
+  private final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = new HashMap<>();
+  private ExecutorHealthChecker executorHealthChecker;
+  private Props props;
+  private ExecutorLoader loader;
+  private ExecutorApiGateway apiGateway;
+  private Alerter mailAlerter;
+  private AlerterHolder alerterHolder;
+  private ExecutableFlow flow1;
+  private Executor executor1;
+
+  @Before
+  public void setUp() throws Exception {
+    this.props = new Props();
+    this.props.put(ConfigurationKeys.AZKABAN_EXECUTOR_MAX_FAILURE_COUNT, 2);
+    this.props.put(ConfigurationKeys.AZKABAN_ADMIN_ALERT_EMAIL, AZ_ADMIN_ALERT_EMAIL);
+    this.loader = mock(ExecutorLoader.class);
+    this.mailAlerter = mock(Alerter.class);
+    this.alerterHolder = mock(AlerterHolder.class);
+    this.apiGateway = mock(ExecutorApiGateway.class);
+    this.executorHealthChecker = new ExecutorHealthChecker(this.props, this.loader, this
+        .apiGateway, this.alerterHolder);
+    this.flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
+    this.flow1.setExecutionId(EXECUTION_ID_11);
+    this.flow1.setStatus(Status.RUNNING);
+    this.executor1 = new Executor(1, "localhost", 12345, true);
+    when(this.loader.fetchActiveFlows()).thenReturn(this.activeFlows);
+    when(this.alerterHolder.get("email")).thenReturn(this.mailAlerter);
+  }
+
+  /**
+   * Test running flow is not finalized and alert email is not sent when executor is alive.
+   */
+  @Test
+  public void checkExecutorHealthAlive() throws Exception {
+    this.activeFlows.put(EXECUTION_ID_11, new Pair<>(
+        new ExecutionReference(EXECUTION_ID_11, this.executor1), this.flow1));
+    when(this.apiGateway.callWithExecutionId(this.executor1.getHost(), this.executor1.getPort(),
+        ConnectorParams.PING_ACTION, null, null)).thenReturn(ImmutableMap.of(ConnectorParams
+        .STATUS_PARAM, ConnectorParams.RESPONSE_ALIVE));
+    this.executorHealthChecker.checkExecutorHealth();
+    assertThat(this.flow1.getStatus()).isEqualTo(Status.RUNNING);
+    verifyZeroInteractions(this.alerterHolder);
+  }
+
+  /**
+   * Test running flow is finalized when its executor is removed from DB.
+   */
+  @Test
+  public void checkExecutorHealthExecutorIdRemoved() throws Exception {
+    this.activeFlows.put(EXECUTION_ID_11, new Pair<>(
+        new ExecutionReference(EXECUTION_ID_11, null), this.flow1));
+    when(this.loader.fetchExecutableFlow(EXECUTION_ID_11)).thenReturn(this.flow1);
+    this.executorHealthChecker.checkExecutorHealth();
+    verify(this.loader).updateExecutableFlow(this.flow1);
+    assertThat(this.flow1.getStatus()).isEqualTo(Status.FAILED);
+  }
+
+  /**
+   * Test alert emails are sent when there are consecutive failures to contact the executor.
+   */
+  @Test
+  public void checkExecutorHealthConsecutiveFailures() throws Exception {
+    this.activeFlows.put(EXECUTION_ID_11, new Pair<>(
+        new ExecutionReference(EXECUTION_ID_11, this.executor1), this.flow1));
+    // Failed to ping executor. Failure count (=1) < MAX_FAILURE_COUNT (=2). Do not alert.
+    this.executorHealthChecker.checkExecutorHealth();
+    verify(this.apiGateway).callWithExecutionId(this.executor1.getHost(), this.executor1.getPort(),
+        ConnectorParams.PING_ACTION, null, null);
+    verifyZeroInteractions(this.alerterHolder);
+
+    // Pinged executor successfully. Failure count (=0) < MAX_FAILURE_COUNT (=2). Do not alert.
+    when(this.apiGateway.callWithExecutionId(this.executor1.getHost(), this.executor1.getPort(),
+        ConnectorParams.PING_ACTION, null, null)).thenReturn(ImmutableMap.of(ConnectorParams
+        .STATUS_PARAM, ConnectorParams.RESPONSE_ALIVE));
+    this.executorHealthChecker.checkExecutorHealth();
+    verifyZeroInteractions(this.alerterHolder);
+
+    // Failed to ping executor. Failure count (=1) < MAX_FAILURE_COUNT (=2). Do not alert.
+    when(this.apiGateway.callWithExecutionId(this.executor1.getHost(), this.executor1.getPort(),
+        ConnectorParams.PING_ACTION, null, null)).thenReturn(null);
+    this.executorHealthChecker.checkExecutorHealth();
+    verifyZeroInteractions(this.alerterHolder);
+
+    // Failed to ping executor again. Failure count (=2) = MAX_FAILURE_COUNT (=2). Alert AZ admin.
+    this.executorHealthChecker.checkExecutorHealth();
+    verify((this.alerterHolder).get("email"))
+        .alertOnFailedUpdate(eq(this.executor1), eq(Arrays.asList(this.flow1)),
+            any(ExecutorManagerException.class));
+    assertThat(this.flow1.getExecutionOptions().getFailureEmails()).isEqualTo
+        (Arrays.asList(AZ_ADMIN_ALERT_EMAIL.split(",")));
+  }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
index 29ec76a..0136a93 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
@@ -104,7 +104,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
         if (action.equals(UPDATE_ACTION)) {
           handleAjaxUpdateRequest(req, respMap);
         } else if (action.equals(PING_ACTION)) {
-          respMap.put("status", "alive");
+          respMap.put(STATUS_PARAM, RESPONSE_ALIVE);
         } else if (action.equals(RELOAD_JOBTYPE_PLUGINS_ACTION)) {
           logger.info("Reloading Jobtype plugins");
           handleReloadJobTypePlugins(respMap);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index dc5ec57..4dee864 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -110,6 +110,7 @@ public class FlowRunnerManager implements EventListener,
 
   private static final int DEFAULT_NUM_EXECUTING_FLOWS = 30;
   private static final int DEFAULT_FLOW_NUM_JOB_TREADS = 10;
+  private static final int DEFAULT_POLLING_INTERVAL_MS = 1000;
 
   // this map is used to store the flows that have been submitted to
   // the executor service. Once a flow has been submitted, it is either
@@ -215,7 +216,7 @@ public class FlowRunnerManager implements EventListener,
     if (this.azkabanProps.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)) {
       this.logger.info("Starting polling service.");
       this.pollingService = new PollingService(this.azkabanProps.getLong
-          (ConfigurationKeys.AZKABAN_POLLING_INTERVAL_MS, 1000));
+          (ConfigurationKeys.AZKABAN_POLLING_INTERVAL_MS, DEFAULT_POLLING_INTERVAL_MS));
       this.pollingService.start();
     }
   }
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 7c8863f..ffacdf5 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -232,10 +232,7 @@ public class AzkabanWebServer extends AzkabanServer {
     /* This creates the Web Server instance */
     app = webServer;
 
-    // Todo jamiesjc: also start the threads in ExecutionController if needed.
-    if (webServer.executorManagerAdapter instanceof ExecutorManager) {
-      ((ExecutorManager) webServer.executorManagerAdapter).start();
-    }
+    webServer.executorManagerAdapter.start();
 
     // TODO refactor code into ServerProvider
     webServer.prepareAndStartServer();