azkaban-developers

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 29d99ec..6ab1bb6 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -61,6 +61,8 @@ import azkaban.utils.Props;
  */
 public class ExecutorManager extends EventHandler implements
   ExecutorManagerAdapter {
+  static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
+    "azkaban.use.multiple.executors";
   private static Logger logger = Logger.getLogger(ExecutorManager.class);
   final private ExecutorLoader executorLoader;
 
@@ -121,7 +123,6 @@ public class ExecutorManager extends EventHandler implements
         DEFAULT_EXECUTION_LOGS_RETENTION_MS);
     cleanerThread = new CleanerThread(executionLogsRetentionMs);
     cleanerThread.start();
-    disableQueueProcessorThread();
   }
 
   /**
@@ -158,7 +159,7 @@ public class ExecutorManager extends EventHandler implements
         executorPort, true));
     }
 
-    if (azkProps.getBoolean("azkaban.use.multiple.executors", false)) {
+    if (azkProps.getBoolean(AZKABAN_USE_MULTIPLE_EXECUTORS, false)) {
       activeExecutors.addAll(executorLoader.fetchActiveExecutors());
     }
 
@@ -167,10 +168,16 @@ public class ExecutorManager extends EventHandler implements
     }
   }
 
+  /**
+   * Disable flow dispatching in QueueProcessor
+   */
   public void disableQueueProcessorThread() {
     queueProcessor.setActive(false);
   }
 
+  /**
+   * Enable flow dispatching in QueueProcessor
+   */
   public void enableQueueProcessorThread() {
     queueProcessor.setActive(true);
   }
@@ -179,6 +186,12 @@ public class ExecutorManager extends EventHandler implements
     return queueProcessor.getState();
   }
 
+  /**
+   * Returns state of QueueProcessor False, no flow is being dispatched True ,
+   * flows are being dispatched as expected
+   *
+   * @return
+   */
   public boolean isQueueProcessorThreadActive() {
     return queueProcessor.isActive();
   }
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
new file mode 100644
index 0000000..3db34aa
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -0,0 +1,302 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.alert.Alerter;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.user.User;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+
+/**
+ * Test class for executor manager
+ */
+public class ExecutorManagerTest {
+  /* Directory with serialized description of test flows */
+  private static final String UNIT_BASE_DIR =
+    "../azkaban-test/src/test/resources/executions/exectest1/";
+
+  private File getFlowDir(String flow) {
+    return new File(UNIT_BASE_DIR + flow + ".flow");
+  }
+
+  private User getTestUser() {
+    return new User("testUser");
+  }
+
+  /* Helper method to create a ExecutorManager Instance */
+  private ExecutorManager createMultiExecutorManagerInstance()
+    throws ExecutorManagerException {
+    return createMultiExecutorManagerInstance(new MockExecutorLoader());
+  }
+
+  /*
+   * Helper method to create a ExecutorManager Instance with the given
+   * ExecutorLoader
+   */
+  private ExecutorManager createMultiExecutorManagerInstance(
+    ExecutorLoader loader) throws ExecutorManagerException {
+    Props props = new Props();
+    props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    loader.addExecutor("localhost", 12345);
+    loader.addExecutor("localhost", 12346);
+    return new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+  }
+
+  /* Helper method to create an ExecutableFlow from serialized description */
+  private ExecutableFlow createExecutableFlow(String flowName)
+    throws IOException {
+    File jsonFlowFile = getFlowDir(flowName);
+    @SuppressWarnings("unchecked")
+    HashMap<String, Object> flowObj =
+      (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+
+    Flow flow = Flow.flowFromObject(flowObj);
+    Project project = new Project(1, "flow");
+    HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
+    flowMap.put(flow.getId(), flow);
+    project.setFlows(flowMap);
+    ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+
+    return execFlow;
+  }
+
+  /*
+   * Test create an executor manager instance without any executor local or
+   * remote
+   */
+  @Test
+  public void testNoExecutorScenario() {
+    try {
+      Props props = new Props();
+      props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+      ExecutorLoader loader = new MockExecutorLoader();
+      @SuppressWarnings("unused")
+      ExecutorManager manager =
+        new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+      Assert.fail("Expecting exception, but didn't get one");
+    } catch (ExecutorManagerException ex) {
+      System.out.println("Test true");
+    }
+  }
+
+  /*
+   * Test backward compatibility with just local executor
+   */
+  @Test
+  public void testLocalExecutorScenario() throws ExecutorManagerException {
+    Props props = new Props();
+    props.put("executor.port", 12345);
+
+    ExecutorLoader loader = new MockExecutorLoader();
+    ExecutorManager manager =
+      new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+    Set<Executor> activeExecutors = manager.getAllActiveExecutors();
+
+    Assert.assertEquals(activeExecutors.size(), 1);
+    Executor executor = activeExecutors.iterator().next();
+    Assert.assertEquals(executor.getHost(), "localhost");
+    Assert.assertEquals(executor.getPort(), 12345);
+    Assert.assertArrayEquals(activeExecutors.toArray(), loader
+      .fetchActiveExecutors().toArray());
+  }
+
+  /*
+   * Test executor manager initialization with multiple executors
+   */
+  @Test
+  public void testMultipleExecutorScenario() throws ExecutorManagerException {
+    Props props = new Props();
+    props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    ExecutorLoader loader = new MockExecutorLoader();
+    Executor executor1 = loader.addExecutor("localhost", 12345);
+    Executor executor2 = loader.addExecutor("localhost", 12346);
+
+    ExecutorManager manager =
+      new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+    Set<Executor> activeExecutors = manager.getAllActiveExecutors();
+    Assert.assertArrayEquals(activeExecutors.toArray(), new Executor[] {
+      executor1, executor2 });
+  }
+
+  /*
+   * Test executor manager active executor reload
+   */
+  @Test
+  public void testSetupExecutorsSucess() throws ExecutorManagerException {
+    Props props = new Props();
+    props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    ExecutorLoader loader = new MockExecutorLoader();
+    Executor executor1 = loader.addExecutor("localhost", 12345);
+
+    ExecutorManager manager =
+      new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+    Set<Executor> activeExecutors = manager.getAllActiveExecutors();
+    Assert.assertArrayEquals(activeExecutors.toArray(),
+      new Executor[] { executor1 });
+
+    // mark older executor as inactive
+    executor1.setActive(false);
+    loader.updateExecutor(executor1);
+    Executor executor2 = loader.addExecutor("localhost", 12346);
+    Executor executor3 = loader.addExecutor("localhost", 12347);
+    manager.setupExecutors();
+
+    Assert.assertArrayEquals(activeExecutors.toArray(), new Executor[] {
+      executor2, executor3 });
+  }
+
+  /*
+   * Test executor manager active executor reload and resulting in no active
+   * executors
+   */
+  @Test
+  public void testSetupExecutorsException() {
+    try {
+      Props props = new Props();
+      props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+      ExecutorLoader loader = new MockExecutorLoader();
+      Executor executor1 = loader.addExecutor("localhost", 12345);
+
+      ExecutorManager manager =
+        new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+      Set<Executor> activeExecutors = manager.getAllActiveExecutors();
+      Assert.assertArrayEquals(activeExecutors.toArray(),
+        new Executor[] { executor1 });
+
+      // mark older executor as inactive
+      executor1.setActive(false);
+      loader.updateExecutor(executor1);
+      manager.setupExecutors();
+      Assert.fail("Expecting exception, but didn't get one");
+    } catch (ExecutorManagerException ex) {
+      System.out.println("Test true");
+    }
+  }
+
+  /* Test disabling queue process thread to pause dispatching */
+  @Test
+  public void testDisablingQueueProcessThread() throws ExecutorManagerException {
+    ExecutorManager manager = createMultiExecutorManagerInstance();
+    manager.disableQueueProcessorThread();
+    Assert.assertEquals(manager.isQueueProcessorThreadActive(), false);
+  }
+
+  /* Test renabling queue process thread to pause restart dispatching */
+  @Test
+  public void testEnablingQueueProcessThread() throws ExecutorManagerException {
+    ExecutorManager manager = createMultiExecutorManagerInstance();
+
+    Assert.assertEquals(manager.isQueueProcessorThreadActive(), true);
+    manager.disableQueueProcessorThread();
+    Assert.assertEquals(manager.isQueueProcessorThreadActive(), false);
+    manager.enableQueueProcessorThread();
+    Assert.assertEquals(manager.isQueueProcessorThreadActive(), true);
+  }
+
+  /* Test submit a non-dispatched flow */
+  @Test
+  public void testQueuedFlows() throws ExecutorManagerException, IOException {
+    ExecutorLoader loader = new MockExecutorLoader();
+    ExecutorManager manager = createMultiExecutorManagerInstance(loader);
+    ExecutableFlow flow1 = createExecutableFlow("exec1");
+    flow1.setExecutionId(1);
+    ExecutableFlow flow2 = createExecutableFlow("exec2");
+    flow2.setExecutionId(2);
+    manager.disableQueueProcessorThread();
+
+    User testUser = getTestUser();
+    manager.submitExecutableFlow(flow1, testUser.getUserId());
+    manager.submitExecutableFlow(flow2, testUser.getUserId());
+
+    List<ExecutableFlow> testFlows = new LinkedList<ExecutableFlow>();
+    testFlows.add(flow1);
+    testFlows.add(flow2);
+
+    List<Pair<ExecutionReference, ExecutableFlow>> queuedFlowsDB =
+      loader.fetchQueuedFlows();
+    Assert.assertEquals(queuedFlowsDB.size(), testFlows.size());
+    // Verify things are correctly setup in db
+    for (Pair<ExecutionReference, ExecutableFlow> pair : queuedFlowsDB) {
+      Assert.assertTrue(testFlows.contains(pair.getSecond()));
+    }
+
+    // Verify running flows using old definition of "running" flows i.e. a
+    // non-dispatched flow is also considered running
+    List<ExecutableFlow> managerActiveFlows = manager.getRunningFlows();
+    Assert.assertTrue(managerActiveFlows.containsAll(testFlows)
+      && testFlows.containsAll(managerActiveFlows));
+
+    // Verify getQueuedFlowIds method
+    Assert.assertEquals("[1, 2]", manager.getQueuedFlowIds());
+  }
+
+  /* Test submit duplicate flow when previous instance is not dispatched */
+  @Test
+  public void testDuplicateQueuedFlows() throws ExecutorManagerException,
+    IOException {
+    try {
+      ExecutorManager manager = createMultiExecutorManagerInstance();
+      ExecutableFlow flow1 = createExecutableFlow("exec1");
+      flow1.getExecutionOptions().setConcurrentOption(
+        ExecutionOptions.CONCURRENT_OPTION_SKIP);
+      manager.disableQueueProcessorThread();
+
+      User testUser = getTestUser();
+      manager.submitExecutableFlow(flow1, testUser.getUserId());
+      manager.submitExecutableFlow(flow1, testUser.getUserId());
+      Assert.fail("Expecting exception, but didn't get one");
+    } catch (ExecutorManagerException ex) {
+      System.out.println("Test true");
+    }
+  }
+
+  /*
+   * Test killing a job in preparation stage at webserver side i.e. a
+   * non-dispatched flow
+   */
+  @Test
+  public void testKillQueuedFlow() throws ExecutorManagerException, IOException {
+    ExecutorLoader loader = new MockExecutorLoader();
+    ExecutorManager manager = createMultiExecutorManagerInstance(loader);
+    ExecutableFlow flow1 = createExecutableFlow("exec1");
+    manager.disableQueueProcessorThread();
+    User testUser = getTestUser();
+    manager.submitExecutableFlow(flow1, testUser.getUserId());
+
+    manager.cancelFlow(flow1, testUser.getUserId());
+    ExecutableFlow fetchedFlow =
+      loader.fetchExecutableFlow(flow1.getExecutionId());
+    Assert.assertEquals(fetchedFlow.getStatus(), Status.FAILED);
+
+    Assert.assertFalse(manager.getRunningFlows().contains(flow1));
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 6969a20..fec183c 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -293,11 +293,12 @@ public class MockExecutorLoader implements ExecutorLoader {
   @Override
   public Executor addExecutor(String host, int port)
     throws ExecutorManagerException {
-    if (fetchExecutor(host, port) != null) {
-
+    Executor executor = null;
+    if (fetchExecutor(host, port) == null) {
+      executorIdCounter++;
+      executor = new Executor(executorIdCounter, host, port, true);
+      executors.add(executor);
     }
-    executorIdCounter++;
-    Executor executor = new Executor(executorIdCounter, host, port, true);
     return executor;
   }
 
@@ -360,7 +361,7 @@ public class MockExecutorLoader implements ExecutorLoader {
     List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
       new ArrayList<Pair<ExecutionReference, ExecutableFlow>>();
     for (int execId : refs.keySet()) {
-      if (executionExecutorMapping.containsKey(execId)) {
+      if (!executionExecutorMapping.containsKey(execId)) {
         queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(refs
           .get(execId), flows.get(execId)));
       }