Details
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index afcea5a..4c4bca5 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -94,6 +94,11 @@ public class Constants {
public static final String AZKABAN_POLL_MODEL = "azkaban.poll.model";
public static final String AZKABAN_POLLING_INTERVAL_MS = "azkaban.polling.interval.ms";
+ // Configures properties for Azkaban executor health check
+ public static final String AZKABAN_EXECUTOR_HEALTHCHECK_INTERVAL_MIN = "azkaban.executor.healthcheck.interval.min";
+ public static final String AZKABAN_EXECUTOR_MAX_FAILURE_COUNT = "azkaban.executor.max.failurecount";
+ public static final String AZKABAN_ADMIN_ALERT_EMAIL = "azkaban.admin.alert.email";
+
// Configures Azkaban Flow Version in project YAML file
public static final String AZKABAN_FLOW_VERSION = "azkaban-flow-version";
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
index 0de0713..20e4f15 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
@@ -55,14 +55,17 @@ public class ExecutionController extends EventHandler implements ExecutorManager
private final ExecutorLoader executorLoader;
private final ExecutorApiGateway apiGateway;
private final AlerterHolder alerterHolder;
+ private final ExecutorHealthChecker executorHealthChecker;
private final int maxConcurrentRunsOneFlow;
@Inject
ExecutionController(final Props azkProps, final ExecutorLoader executorLoader,
- final ExecutorApiGateway apiGateway, final AlerterHolder alerterHolder) {
+ final ExecutorApiGateway apiGateway, final AlerterHolder alerterHolder, final
+ ExecutorHealthChecker executorHealthChecker) {
this.executorLoader = executorLoader;
this.apiGateway = apiGateway;
this.alerterHolder = alerterHolder;
+ this.executorHealthChecker = executorHealthChecker;
this.maxConcurrentRunsOneFlow = getMaxConcurrentRunsOneFlow(azkProps);
}
@@ -112,7 +115,7 @@ public class ExecutionController extends EventHandler implements ExecutorManager
try {
executors = this.executorLoader.fetchActiveExecutors();
} catch (final ExecutorManagerException e) {
- this.logger.error("Failed to get all active executors.", e);
+ logger.error("Failed to get all active executors.", e);
}
return executors;
}
@@ -130,7 +133,7 @@ public class ExecutionController extends EventHandler implements ExecutorManager
ports.add(executor.getHost() + ":" + executor.getPort());
}
} catch (final ExecutorManagerException e) {
- this.logger.error("Failed to get primary server hosts.", e);
+ logger.error("Failed to get primary server hosts.", e);
}
return ports;
}
@@ -149,7 +152,7 @@ public class ExecutionController extends EventHandler implements ExecutorManager
}
}
} catch (final ExecutorManagerException e) {
- this.logger.error("Failed to get all active executor server hosts.", e);
+ logger.error("Failed to get all active executor server hosts.", e);
}
return ports;
}
@@ -167,7 +170,7 @@ public class ExecutionController extends EventHandler implements ExecutorManager
executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
this.executorLoader.fetchUnfinishedFlows().values()));
} catch (final ExecutorManagerException e) {
- this.logger.error("Failed to get running flows for project " + projectId + ", flow "
+ logger.error("Failed to get running flows for project " + projectId + ", flow "
+ flowId, e);
}
return executionIds;
@@ -193,7 +196,7 @@ public class ExecutionController extends EventHandler implements ExecutorManager
try {
getActiveFlowsWithExecutorHelper(flows, this.executorLoader.fetchUnfinishedFlows().values());
} catch (final ExecutorManagerException e) {
- this.logger.error("Failed to get active flows with executor.", e);
+ logger.error("Failed to get active flows with executor.", e);
}
return flows;
}
@@ -220,7 +223,7 @@ public class ExecutionController extends EventHandler implements ExecutorManager
this.executorLoader.fetchUnfinishedFlows().values());
} catch (final ExecutorManagerException e) {
- this.logger.error(
+ logger.error(
"Failed to check if the flow is running for project " + projectId + ", flow " + flowId,
e);
}
@@ -257,7 +260,7 @@ public class ExecutionController extends EventHandler implements ExecutorManager
try {
getFlowsHelper(flows, this.executorLoader.fetchUnfinishedFlows().values());
} catch (final ExecutorManagerException e) {
- this.logger.error("Failed to get running flows.", e);
+ logger.error("Failed to get running flows.", e);
}
return flows;
}
@@ -669,10 +672,14 @@ public class ExecutionController extends EventHandler implements ExecutorManager
Integer.valueOf(hostPortSplit[1]), "/jmx", paramList);
}
+ @Override
+ public void start() {
+ this.executorHealthChecker.start();
+ }
@Override
public void shutdown() {
- //Todo: shutdown any thread that is running
+ this.executorHealthChecker.shutdown();
}
@Override
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorHealthChecker.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorHealthChecker.java
new file mode 100644
index 0000000..ac774e0
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorHealthChecker.java
@@ -0,0 +1,180 @@
+/*
+* Copyright 2019 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+package azkaban.executor;
+
+import azkaban.Constants.ConfigurationKeys;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Periodically checks the health of executors. Finalizes flows or sends alert emails when needed.
+ */
+@SuppressWarnings("FutureReturnValueIgnored")
+@Singleton
+public class ExecutorHealthChecker {
+
+ private static final Logger logger = LoggerFactory.getLogger(ExecutorHealthChecker.class);
+ // Max number of executor failures before sending out alert emails.
+ private static final int DEFAULT_EXECUTOR_MAX_FAILURE_COUNT = 6;
+ // Web server checks executor health every 5 min by default.
+ private static final Duration DEFAULT_EXECUTOR_HEALTHCHECK_INTERVAL = Duration.ofMinutes(5);
+ private final long healthCheckIntervalMin;
+ private final int executorMaxFailureCount;
+ private final List<String> alertEmails;
+ private final ScheduledExecutorService scheduler;
+ private final ExecutorLoader executorLoader;
+ private final ExecutorApiGateway apiGateway;
+ private final AlerterHolder alerterHolder;
+ private final Map<Integer, Integer> executorFailureCount = new HashMap<>();
+
+ @Inject
+ public ExecutorHealthChecker(final Props azkProps, final ExecutorLoader executorLoader,
+ final ExecutorApiGateway apiGateway, final AlerterHolder alerterHolder) {
+ this.healthCheckIntervalMin = azkProps
+ .getLong(ConfigurationKeys.AZKABAN_EXECUTOR_HEALTHCHECK_INTERVAL_MIN,
+ DEFAULT_EXECUTOR_HEALTHCHECK_INTERVAL.toMinutes());
+ this.executorMaxFailureCount = azkProps.getInt(ConfigurationKeys
+ .AZKABAN_EXECUTOR_MAX_FAILURE_COUNT, DEFAULT_EXECUTOR_MAX_FAILURE_COUNT);
+ this.alertEmails = azkProps.getStringList(ConfigurationKeys.AZKABAN_ADMIN_ALERT_EMAIL);
+ this.scheduler = Executors.newSingleThreadScheduledExecutor();
+ this.executorLoader = executorLoader;
+ this.apiGateway = apiGateway;
+ this.alerterHolder = alerterHolder;
+ }
+
+ public void start() {
+ logger.info("Starting executor health checker.");
+ this.scheduler.scheduleAtFixedRate(() -> checkExecutorHealth(), 0L, this.healthCheckIntervalMin,
+ TimeUnit.MINUTES);
+ }
+
+ public void shutdown() {
+ this.scheduler.shutdown();
+ try {
+ if (!this.scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
+ this.scheduler.shutdownNow();
+ }
+ } catch (final InterruptedException ex) {
+ this.scheduler.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Checks executor health. Finalizes the flow if its executor is already removed from DB or
+ * sends alert emails if the executor isn't alive any more.
+ */
+ public void checkExecutorHealth() {
+ final Map<Optional<Executor>, List<ExecutableFlow>> exFlowMap = getFlowToExecutorMap();
+ for (final Map.Entry<Optional<Executor>, List<ExecutableFlow>> entry : exFlowMap.entrySet()) {
+ final Optional<Executor> executorOption = entry.getKey();
+ if (!executorOption.isPresent()) {
+ final String finalizeReason = "Executor id of this execution doesn't exist.";
+ for (final ExecutableFlow flow : entry.getValue()) {
+ logger.warn(
+ String.format("Finalizing execution %s, %s", flow.getExecutionId(), finalizeReason));
+ ExecutionControllerUtils
+ .finalizeFlow(this.executorLoader, this.alerterHolder, flow, finalizeReason, null);
+ }
+ continue;
+ }
+
+ final Executor executor = executorOption.get();
+ try {
+ // Todo jamiesjc: add metrics to monitor the http call return time
+ final Map<String, Object> results = this.apiGateway
+ .callWithExecutionId(executor.getHost(), executor.getPort(),
+ ConnectorParams.PING_ACTION, null, null);
+ if (results == null || results.containsKey(ConnectorParams.RESPONSE_ERROR) || !results
+ .containsKey(ConnectorParams.STATUS_PARAM) || !results.get(ConnectorParams.STATUS_PARAM)
+ .equals(ConnectorParams.RESPONSE_ALIVE)) {
+ throw new ExecutorManagerException("Status of executor " + executor.getId() + " is "
+ + "not alive.");
+ } else {
+ // Executor is alive. Clear the failure count.
+ if (this.executorFailureCount.containsKey(executor.getId())) {
+ this.executorFailureCount.put(executor.getId(), 0);
+ }
+ }
+ } catch (final ExecutorManagerException e) {
+ handleExecutorNotAliveCase(entry, executor, e);
+ }
+ }
+ }
+
+ /**
+ * Groups Executable flow by Executors to reduce number of REST calls.
+ *
+ * @return executor to list of flows map
+ */
+ private Map<Optional<Executor>, List<ExecutableFlow>> getFlowToExecutorMap() {
+ final HashMap<Optional<Executor>, List<ExecutableFlow>> exFlowMap = new HashMap<>();
+ try {
+ for (final Pair<ExecutionReference, ExecutableFlow> runningFlow : this
+ .executorLoader.fetchActiveFlows().values()) {
+ final Optional<Executor> executor = runningFlow.getFirst().getExecutor();
+ List<ExecutableFlow> flows = exFlowMap.get(executor);
+ if (flows == null) {
+ flows = new ArrayList<>();
+ exFlowMap.put(executor, flows);
+ }
+ flows.add(runningFlow.getSecond());
+ }
+ } catch (final ExecutorManagerException e) {
+ logger.error("Failed to get flow to executor map");
+ }
+ return exFlowMap;
+ }
+
+ /**
+ * Increments executor failure count. If it reaches max failure count, sends alert emails to AZ
+ * admin.
+ *
+ * @param entry executor to list of flows map entry
+ * @param executor the executor
+ * @param e Exception thrown when the executor is not alive
+ */
+ private void handleExecutorNotAliveCase(
+ final Entry<Optional<Executor>, List<ExecutableFlow>> entry, final Executor executor,
+ final ExecutorManagerException e) {
+ logger.error("Failed to get update from executor " + executor.getId(), e);
+ this.executorFailureCount.put(executor.getId(), this.executorFailureCount.getOrDefault
+ (executor.getId(), 0) + 1);
+ if (this.executorFailureCount.get(executor.getId()) % this.executorMaxFailureCount == 0
+ && !this.alertEmails.isEmpty()) {
+ entry.getValue().stream().forEach(flow -> flow
+ .getExecutionOptions().setFailureEmails(this.alertEmails));
+ logger.info(String.format("Executor failure count is %d. Sending alert emails to %s.",
+ this.executorFailureCount.get(executor.getId()), this.alertEmails));
+ this.alerterHolder.get("email").alertOnFailedUpdate(executor, entry.getValue(), e);
+ }
+ }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 0125778..f852732 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -164,6 +164,7 @@ public class ExecutorManager extends EventHandler implements
this.queueProcessor = setupQueueProcessor();
}
+ @Override
public void start() throws ExecutorManagerException {
initialize();
this.updaterThread.start();
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
index 907ab1c..e229153 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -116,6 +116,8 @@ public interface ExecutorManagerAdapter {
public Map<String, Object> callExecutorJMX(String hostPort, String action,
String mBean) throws IOException;
+ public void start() throws ExecutorManagerException;
+
public void shutdown();
public Set<String> getAllActiveExecutorServerHosts();
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
index 9b8ab3c..209a75d 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
@@ -34,7 +34,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -51,6 +50,7 @@ public class ExecutionControllerTest {
private ExecutorLoader loader;
private ExecutorApiGateway apiGateway;
private AlerterHolder alertHolder;
+ private ExecutorHealthChecker executorHealthChecker;
private Props props;
private User user;
private ExecutableFlow flow1;
@@ -69,8 +69,9 @@ public class ExecutionControllerTest {
this.apiGateway = mock(ExecutorApiGateway.class);
this.props.put(Constants.ConfigurationKeys.MAX_CONCURRENT_RUNS_ONEFLOW, 1);
this.alertHolder = mock(AlerterHolder.class);
+ this.executorHealthChecker = mock(ExecutorHealthChecker.class);
this.controller = new ExecutionController(this.props, this.loader, this.apiGateway,
- this.alertHolder);
+ this.alertHolder, this.executorHealthChecker);
final Executor executor1 = new Executor(1, "localhost", 12345, true);
final Executor executor2 = new Executor(2, "localhost", 12346, true);
@@ -99,13 +100,6 @@ public class ExecutionControllerTest {
when(this.loader.fetchQueuedFlows()).thenReturn(this.queuedFlows);
}
- @After
- public void tearDown() {
- if (this.controller != null) {
- this.controller.shutdown();
- }
- }
-
@Test
public void testFetchAllActiveFlows() throws Exception {
initializeUnfinishedFlows();
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorHealthCheckerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorHealthCheckerTest.java
new file mode 100644
index 0000000..ef29bd2
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorHealthCheckerTest.java
@@ -0,0 +1,136 @@
+/*
+* Copyright 2019 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+package azkaban.executor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import azkaban.Constants.ConfigurationKeys;
+import azkaban.alert.Alerter;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test case for executor health checker.
+ */
+public class ExecutorHealthCheckerTest {
+
+ private static final int EXECUTION_ID_11 = 11;
+ private static final String AZ_ADMIN_ALERT_EMAIL = "az_admin1@foo.com,az_admin2@foo.com";
+ private final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = new HashMap<>();
+ private ExecutorHealthChecker executorHealthChecker;
+ private Props props;
+ private ExecutorLoader loader;
+ private ExecutorApiGateway apiGateway;
+ private Alerter mailAlerter;
+ private AlerterHolder alerterHolder;
+ private ExecutableFlow flow1;
+ private Executor executor1;
+
+ @Before
+ public void setUp() throws Exception {
+ this.props = new Props();
+ this.props.put(ConfigurationKeys.AZKABAN_EXECUTOR_MAX_FAILURE_COUNT, 2);
+ this.props.put(ConfigurationKeys.AZKABAN_ADMIN_ALERT_EMAIL, AZ_ADMIN_ALERT_EMAIL);
+ this.loader = mock(ExecutorLoader.class);
+ this.mailAlerter = mock(Alerter.class);
+ this.alerterHolder = mock(AlerterHolder.class);
+ this.apiGateway = mock(ExecutorApiGateway.class);
+ this.executorHealthChecker = new ExecutorHealthChecker(this.props, this.loader, this
+ .apiGateway, this.alerterHolder);
+ this.flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
+ this.flow1.setExecutionId(EXECUTION_ID_11);
+ this.flow1.setStatus(Status.RUNNING);
+ this.executor1 = new Executor(1, "localhost", 12345, true);
+ when(this.loader.fetchActiveFlows()).thenReturn(this.activeFlows);
+ when(this.alerterHolder.get("email")).thenReturn(this.mailAlerter);
+ }
+
+ /**
+ * Test running flow is not finalized and alert email is not sent when executor is alive.
+ */
+ @Test
+ public void checkExecutorHealthAlive() throws Exception {
+ this.activeFlows.put(EXECUTION_ID_11, new Pair<>(
+ new ExecutionReference(EXECUTION_ID_11, this.executor1), this.flow1));
+ when(this.apiGateway.callWithExecutionId(this.executor1.getHost(), this.executor1.getPort(),
+ ConnectorParams.PING_ACTION, null, null)).thenReturn(ImmutableMap.of(ConnectorParams
+ .STATUS_PARAM, ConnectorParams.RESPONSE_ALIVE));
+ this.executorHealthChecker.checkExecutorHealth();
+ assertThat(this.flow1.getStatus()).isEqualTo(Status.RUNNING);
+ verifyZeroInteractions(this.alerterHolder);
+ }
+
+ /**
+ * Test running flow is finalized when its executor is removed from DB.
+ */
+ @Test
+ public void checkExecutorHealthExecutorIdRemoved() throws Exception {
+ this.activeFlows.put(EXECUTION_ID_11, new Pair<>(
+ new ExecutionReference(EXECUTION_ID_11, null), this.flow1));
+ when(this.loader.fetchExecutableFlow(EXECUTION_ID_11)).thenReturn(this.flow1);
+ this.executorHealthChecker.checkExecutorHealth();
+ verify(this.loader).updateExecutableFlow(this.flow1);
+ assertThat(this.flow1.getStatus()).isEqualTo(Status.FAILED);
+ }
+
+ /**
+ * Test alert emails are sent when there are consecutive failures to contact the executor.
+ */
+ @Test
+ public void checkExecutorHealthConsecutiveFailures() throws Exception {
+ this.activeFlows.put(EXECUTION_ID_11, new Pair<>(
+ new ExecutionReference(EXECUTION_ID_11, this.executor1), this.flow1));
+ // Failed to ping executor. Failure count (=1) < MAX_FAILURE_COUNT (=2). Do not alert.
+ this.executorHealthChecker.checkExecutorHealth();
+ verify(this.apiGateway).callWithExecutionId(this.executor1.getHost(), this.executor1.getPort(),
+ ConnectorParams.PING_ACTION, null, null);
+ verifyZeroInteractions(this.alerterHolder);
+
+ // Pinged executor successfully. Failure count (=0) < MAX_FAILURE_COUNT (=2). Do not alert.
+ when(this.apiGateway.callWithExecutionId(this.executor1.getHost(), this.executor1.getPort(),
+ ConnectorParams.PING_ACTION, null, null)).thenReturn(ImmutableMap.of(ConnectorParams
+ .STATUS_PARAM, ConnectorParams.RESPONSE_ALIVE));
+ this.executorHealthChecker.checkExecutorHealth();
+ verifyZeroInteractions(this.alerterHolder);
+
+ // Failed to ping executor. Failure count (=1) < MAX_FAILURE_COUNT (=2). Do not alert.
+ when(this.apiGateway.callWithExecutionId(this.executor1.getHost(), this.executor1.getPort(),
+ ConnectorParams.PING_ACTION, null, null)).thenReturn(null);
+ this.executorHealthChecker.checkExecutorHealth();
+ verifyZeroInteractions(this.alerterHolder);
+
+ // Failed to ping executor again. Failure count (=2) = MAX_FAILURE_COUNT (=2). Alert AZ admin.
+ this.executorHealthChecker.checkExecutorHealth();
+ verify((this.alerterHolder).get("email"))
+ .alertOnFailedUpdate(eq(this.executor1), eq(Arrays.asList(this.flow1)),
+ any(ExecutorManagerException.class));
+ assertThat(this.flow1.getExecutionOptions().getFailureEmails()).isEqualTo
+ (Arrays.asList(AZ_ADMIN_ALERT_EMAIL.split(",")));
+ }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
index 29ec76a..0136a93 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
@@ -104,7 +104,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
if (action.equals(UPDATE_ACTION)) {
handleAjaxUpdateRequest(req, respMap);
} else if (action.equals(PING_ACTION)) {
- respMap.put("status", "alive");
+ respMap.put(STATUS_PARAM, RESPONSE_ALIVE);
} else if (action.equals(RELOAD_JOBTYPE_PLUGINS_ACTION)) {
logger.info("Reloading Jobtype plugins");
handleReloadJobTypePlugins(respMap);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index dc5ec57..4dee864 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -110,6 +110,7 @@ public class FlowRunnerManager implements EventListener,
private static final int DEFAULT_NUM_EXECUTING_FLOWS = 30;
private static final int DEFAULT_FLOW_NUM_JOB_TREADS = 10;
+ private static final int DEFAULT_POLLING_INTERVAL_MS = 1000;
// this map is used to store the flows that have been submitted to
// the executor service. Once a flow has been submitted, it is either
@@ -215,7 +216,7 @@ public class FlowRunnerManager implements EventListener,
if (this.azkabanProps.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)) {
this.logger.info("Starting polling service.");
this.pollingService = new PollingService(this.azkabanProps.getLong
- (ConfigurationKeys.AZKABAN_POLLING_INTERVAL_MS, 1000));
+ (ConfigurationKeys.AZKABAN_POLLING_INTERVAL_MS, DEFAULT_POLLING_INTERVAL_MS));
this.pollingService.start();
}
}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 7c8863f..ffacdf5 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -232,10 +232,7 @@ public class AzkabanWebServer extends AzkabanServer {
/* This creates the Web Server instance */
app = webServer;
- // Todo jamiesjc: also start the threads in ExecutionController if needed.
- if (webServer.executorManagerAdapter instanceof ExecutorManager) {
- ((ExecutorManager) webServer.executorManagerAdapter).start();
- }
+ webServer.executorManagerAdapter.start();
// TODO refactor code into ServerProvider
webServer.prepareAndStartServer();