diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 29d99ec..6ab1bb6 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -61,6 +61,8 @@ import azkaban.utils.Props;
*/
public class ExecutorManager extends EventHandler implements
ExecutorManagerAdapter {
+ static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
+ "azkaban.use.multiple.executors";
private static Logger logger = Logger.getLogger(ExecutorManager.class);
final private ExecutorLoader executorLoader;
@@ -121,7 +123,6 @@ public class ExecutorManager extends EventHandler implements
DEFAULT_EXECUTION_LOGS_RETENTION_MS);
cleanerThread = new CleanerThread(executionLogsRetentionMs);
cleanerThread.start();
- disableQueueProcessorThread();
}
/**
@@ -158,7 +159,7 @@ public class ExecutorManager extends EventHandler implements
executorPort, true));
}
- if (azkProps.getBoolean("azkaban.use.multiple.executors", false)) {
+ if (azkProps.getBoolean(AZKABAN_USE_MULTIPLE_EXECUTORS, false)) {
activeExecutors.addAll(executorLoader.fetchActiveExecutors());
}
@@ -167,10 +168,16 @@ public class ExecutorManager extends EventHandler implements
}
}
+ /**
+ * Disable flow dispatching in QueueProcessor
+ */
public void disableQueueProcessorThread() {
queueProcessor.setActive(false);
}
+ /**
+ * Enable flow dispatching in QueueProcessor
+ */
public void enableQueueProcessorThread() {
queueProcessor.setActive(true);
}
@@ -179,6 +186,12 @@ public class ExecutorManager extends EventHandler implements
return queueProcessor.getState();
}
+ /**
+ * Returns state of QueueProcessor False, no flow is being dispatched True ,
+ * flows are being dispatched as expected
+ *
+ * @return
+ */
public boolean isQueueProcessorThreadActive() {
return queueProcessor.isActive();
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
new file mode 100644
index 0000000..3db34aa
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -0,0 +1,302 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.alert.Alerter;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.user.User;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+
+/**
+ * Test class for executor manager
+ */
+public class ExecutorManagerTest {
+ /* Directory with serialized description of test flows */
+ private static final String UNIT_BASE_DIR =
+ "../azkaban-test/src/test/resources/executions/exectest1/";
+
+ private File getFlowDir(String flow) {
+ return new File(UNIT_BASE_DIR + flow + ".flow");
+ }
+
+ private User getTestUser() {
+ return new User("testUser");
+ }
+
+ /* Helper method to create a ExecutorManager Instance */
+ private ExecutorManager createMultiExecutorManagerInstance()
+ throws ExecutorManagerException {
+ return createMultiExecutorManagerInstance(new MockExecutorLoader());
+ }
+
+ /*
+ * Helper method to create a ExecutorManager Instance with the given
+ * ExecutorLoader
+ */
+ private ExecutorManager createMultiExecutorManagerInstance(
+ ExecutorLoader loader) throws ExecutorManagerException {
+ Props props = new Props();
+ props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ loader.addExecutor("localhost", 12345);
+ loader.addExecutor("localhost", 12346);
+ return new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+ }
+
+ /* Helper method to create an ExecutableFlow from serialized description */
+ private ExecutableFlow createExecutableFlow(String flowName)
+ throws IOException {
+ File jsonFlowFile = getFlowDir(flowName);
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> flowObj =
+ (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+
+ Flow flow = Flow.flowFromObject(flowObj);
+ Project project = new Project(1, "flow");
+ HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
+ flowMap.put(flow.getId(), flow);
+ project.setFlows(flowMap);
+ ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+
+ return execFlow;
+ }
+
+ /*
+ * Test create an executor manager instance without any executor local or
+ * remote
+ */
+ @Test
+ public void testNoExecutorScenario() {
+ try {
+ Props props = new Props();
+ props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ ExecutorLoader loader = new MockExecutorLoader();
+ @SuppressWarnings("unused")
+ ExecutorManager manager =
+ new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+ Assert.fail("Expecting exception, but didn't get one");
+ } catch (ExecutorManagerException ex) {
+ System.out.println("Test true");
+ }
+ }
+
+ /*
+ * Test backward compatibility with just local executor
+ */
+ @Test
+ public void testLocalExecutorScenario() throws ExecutorManagerException {
+ Props props = new Props();
+ props.put("executor.port", 12345);
+
+ ExecutorLoader loader = new MockExecutorLoader();
+ ExecutorManager manager =
+ new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+ Set<Executor> activeExecutors = manager.getAllActiveExecutors();
+
+ Assert.assertEquals(activeExecutors.size(), 1);
+ Executor executor = activeExecutors.iterator().next();
+ Assert.assertEquals(executor.getHost(), "localhost");
+ Assert.assertEquals(executor.getPort(), 12345);
+ Assert.assertArrayEquals(activeExecutors.toArray(), loader
+ .fetchActiveExecutors().toArray());
+ }
+
+ /*
+ * Test executor manager initialization with multiple executors
+ */
+ @Test
+ public void testMultipleExecutorScenario() throws ExecutorManagerException {
+ Props props = new Props();
+ props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ ExecutorLoader loader = new MockExecutorLoader();
+ Executor executor1 = loader.addExecutor("localhost", 12345);
+ Executor executor2 = loader.addExecutor("localhost", 12346);
+
+ ExecutorManager manager =
+ new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+ Set<Executor> activeExecutors = manager.getAllActiveExecutors();
+ Assert.assertArrayEquals(activeExecutors.toArray(), new Executor[] {
+ executor1, executor2 });
+ }
+
+ /*
+ * Test executor manager active executor reload
+ */
+ @Test
+ public void testSetupExecutorsSucess() throws ExecutorManagerException {
+ Props props = new Props();
+ props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ ExecutorLoader loader = new MockExecutorLoader();
+ Executor executor1 = loader.addExecutor("localhost", 12345);
+
+ ExecutorManager manager =
+ new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+ Set<Executor> activeExecutors = manager.getAllActiveExecutors();
+ Assert.assertArrayEquals(activeExecutors.toArray(),
+ new Executor[] { executor1 });
+
+ // mark older executor as inactive
+ executor1.setActive(false);
+ loader.updateExecutor(executor1);
+ Executor executor2 = loader.addExecutor("localhost", 12346);
+ Executor executor3 = loader.addExecutor("localhost", 12347);
+ manager.setupExecutors();
+
+ Assert.assertArrayEquals(activeExecutors.toArray(), new Executor[] {
+ executor2, executor3 });
+ }
+
+ /*
+ * Test executor manager active executor reload and resulting in no active
+ * executors
+ */
+ @Test
+ public void testSetupExecutorsException() {
+ try {
+ Props props = new Props();
+ props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ ExecutorLoader loader = new MockExecutorLoader();
+ Executor executor1 = loader.addExecutor("localhost", 12345);
+
+ ExecutorManager manager =
+ new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+ Set<Executor> activeExecutors = manager.getAllActiveExecutors();
+ Assert.assertArrayEquals(activeExecutors.toArray(),
+ new Executor[] { executor1 });
+
+ // mark older executor as inactive
+ executor1.setActive(false);
+ loader.updateExecutor(executor1);
+ manager.setupExecutors();
+ Assert.fail("Expecting exception, but didn't get one");
+ } catch (ExecutorManagerException ex) {
+ System.out.println("Test true");
+ }
+ }
+
+ /* Test disabling queue process thread to pause dispatching */
+ @Test
+ public void testDisablingQueueProcessThread() throws ExecutorManagerException {
+ ExecutorManager manager = createMultiExecutorManagerInstance();
+ manager.disableQueueProcessorThread();
+ Assert.assertEquals(manager.isQueueProcessorThreadActive(), false);
+ }
+
+ /* Test renabling queue process thread to pause restart dispatching */
+ @Test
+ public void testEnablingQueueProcessThread() throws ExecutorManagerException {
+ ExecutorManager manager = createMultiExecutorManagerInstance();
+
+ Assert.assertEquals(manager.isQueueProcessorThreadActive(), true);
+ manager.disableQueueProcessorThread();
+ Assert.assertEquals(manager.isQueueProcessorThreadActive(), false);
+ manager.enableQueueProcessorThread();
+ Assert.assertEquals(manager.isQueueProcessorThreadActive(), true);
+ }
+
+ /* Test submit a non-dispatched flow */
+ @Test
+ public void testQueuedFlows() throws ExecutorManagerException, IOException {
+ ExecutorLoader loader = new MockExecutorLoader();
+ ExecutorManager manager = createMultiExecutorManagerInstance(loader);
+ ExecutableFlow flow1 = createExecutableFlow("exec1");
+ flow1.setExecutionId(1);
+ ExecutableFlow flow2 = createExecutableFlow("exec2");
+ flow2.setExecutionId(2);
+ manager.disableQueueProcessorThread();
+
+ User testUser = getTestUser();
+ manager.submitExecutableFlow(flow1, testUser.getUserId());
+ manager.submitExecutableFlow(flow2, testUser.getUserId());
+
+ List<ExecutableFlow> testFlows = new LinkedList<ExecutableFlow>();
+ testFlows.add(flow1);
+ testFlows.add(flow2);
+
+ List<Pair<ExecutionReference, ExecutableFlow>> queuedFlowsDB =
+ loader.fetchQueuedFlows();
+ Assert.assertEquals(queuedFlowsDB.size(), testFlows.size());
+ // Verify things are correctly setup in db
+ for (Pair<ExecutionReference, ExecutableFlow> pair : queuedFlowsDB) {
+ Assert.assertTrue(testFlows.contains(pair.getSecond()));
+ }
+
+ // Verify running flows using old definition of "running" flows i.e. a
+ // non-dispatched flow is also considered running
+ List<ExecutableFlow> managerActiveFlows = manager.getRunningFlows();
+ Assert.assertTrue(managerActiveFlows.containsAll(testFlows)
+ && testFlows.containsAll(managerActiveFlows));
+
+ // Verify getQueuedFlowIds method
+ Assert.assertEquals("[1, 2]", manager.getQueuedFlowIds());
+ }
+
+ /* Test submit duplicate flow when previous instance is not dispatched */
+ @Test
+ public void testDuplicateQueuedFlows() throws ExecutorManagerException,
+ IOException {
+ try {
+ ExecutorManager manager = createMultiExecutorManagerInstance();
+ ExecutableFlow flow1 = createExecutableFlow("exec1");
+ flow1.getExecutionOptions().setConcurrentOption(
+ ExecutionOptions.CONCURRENT_OPTION_SKIP);
+ manager.disableQueueProcessorThread();
+
+ User testUser = getTestUser();
+ manager.submitExecutableFlow(flow1, testUser.getUserId());
+ manager.submitExecutableFlow(flow1, testUser.getUserId());
+ Assert.fail("Expecting exception, but didn't get one");
+ } catch (ExecutorManagerException ex) {
+ System.out.println("Test true");
+ }
+ }
+
+ /*
+ * Test killing a job in preparation stage at webserver side i.e. a
+ * non-dispatched flow
+ */
+ @Test
+ public void testKillQueuedFlow() throws ExecutorManagerException, IOException {
+ ExecutorLoader loader = new MockExecutorLoader();
+ ExecutorManager manager = createMultiExecutorManagerInstance(loader);
+ ExecutableFlow flow1 = createExecutableFlow("exec1");
+ manager.disableQueueProcessorThread();
+ User testUser = getTestUser();
+ manager.submitExecutableFlow(flow1, testUser.getUserId());
+
+ manager.cancelFlow(flow1, testUser.getUserId());
+ ExecutableFlow fetchedFlow =
+ loader.fetchExecutableFlow(flow1.getExecutionId());
+ Assert.assertEquals(fetchedFlow.getStatus(), Status.FAILED);
+
+ Assert.assertFalse(manager.getRunningFlows().contains(flow1));
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 6969a20..fec183c 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -293,11 +293,12 @@ public class MockExecutorLoader implements ExecutorLoader {
@Override
public Executor addExecutor(String host, int port)
throws ExecutorManagerException {
- if (fetchExecutor(host, port) != null) {
-
+ Executor executor = null;
+ if (fetchExecutor(host, port) == null) {
+ executorIdCounter++;
+ executor = new Executor(executorIdCounter, host, port, true);
+ executors.add(executor);
}
- executorIdCounter++;
- Executor executor = new Executor(executorIdCounter, host, port, true);
return executor;
}
@@ -360,7 +361,7 @@ public class MockExecutorLoader implements ExecutorLoader {
List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
new ArrayList<Pair<ExecutionReference, ExecutableFlow>>();
for (int execId : refs.keySet()) {
- if (executionExecutorMapping.containsKey(execId)) {
+ if (!executionExecutorMapping.containsKey(execId)) {
queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(refs
.get(execId), flows.get(execId)));
}