azkaban-developers

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java
index eb3f834..3050a8d 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java
@@ -22,11 +22,26 @@ import org.apache.log4j.Logger;
 
 import azkaban.utils.Pair;
 
+/**
+ * Comparator implicitly used in priority queue for QueuedExecutions.
+ */
 public final class ExecutableFlowPriorityComparator implements
   Comparator<Pair<ExecutionReference, ExecutableFlow>> {
   private static Logger logger = Logger
     .getLogger(ExecutableFlowPriorityComparator.class);
 
+  /**
+   * <pre>
+   * Sorting order is determined by:-
+   * 1. descending order of priority
+   * 2. if same priority, ascending order of update time
+   * 3. if same priority and updateTime, ascending order of execution id
+   * </pre>
+   *
+   * {@inheritDoc}
+   *
+   * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
+   */
   @Override
   public int compare(Pair<ExecutionReference, ExecutableFlow> pair1,
     Pair<ExecutionReference, ExecutableFlow> pair2) {
@@ -47,11 +62,11 @@ public final class ExecutableFlowPriorityComparator implements
       // descending order of priority
       int diff = getPriority(exflow2) - getPriority(exflow1);
       if (diff == 0) {
-        // increasing order of update time, if same priority
+        // ascending order of update time, if same priority
         diff = (int) (exflow1.getUpdateTime() - exflow2.getUpdateTime());
       }
       if (diff == 0) {
-        // increasing order of execution id, if same priority and updateTime
+        // ascending order of execution id, if same priority and updateTime
         diff = exflow1.getExecutionId() - exflow2.getExecutionId();
       }
       return diff;
@@ -72,8 +87,9 @@ public final class ExecutableFlowPriorityComparator implements
             ExecutionOptions.FLOW_PRIORITY));
       } catch (NumberFormatException ex) {
         priority = ExecutionOptions.DEFAULT_FLOW_PRIORITY;
-        logger.error("Failed to parse flow priority for exec_id = "
-          + exflow.getExecutionId());
+        logger.error(
+          "Failed to parse flow priority for exec_id = "
+            + exflow.getExecutionId(), ex);
       }
     }
     return priority;
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
index 82d6e80..9d93476 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
@@ -29,6 +29,10 @@ public class ExecutionReference {
   }
 
   public ExecutionReference(int execId, Executor executor) {
+    if (executor == null) {
+      throw new IllegalArgumentException(String.format(
+        "Executor cannot be null for exec id: %d ExecutionReference", execId));
+    }
     this.execId = execId;
     this.executor = executor;
   }
diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
index a81b95d..31070bf 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Executor.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -107,9 +107,4 @@ public class Executor {
   public void setActive(boolean isActive) {
     this.isActive = isActive;
   }
-
-  @Override
-  public String toString() {
-    return String.format("[%d]%s:%d", id, host, port);
-  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index ab9859e..c97ef00 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -1667,7 +1667,8 @@ public class ExecutorManager extends EventHandler implements
   }
 
   /*
-   * This thread is responsible for processing queued flows.
+   * This thread is responsible for processing queued flows using dispatcher and
+   * making rest api calls to executor server
    */
   private class QueueProcessorThread extends Thread {
     private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
@@ -1675,8 +1676,8 @@ public class ExecutorManager extends EventHandler implements
     private final long activeExecutorRefreshWindowInMilisec;
     private final int activeExecutorRefreshWindowInFlows;
 
-    private boolean shutdown = false;
-    private boolean isActive = true;
+    private volatile boolean shutdown = false;
+    private volatile boolean isActive = true;
 
     public QueueProcessorThread(boolean isActive,
       long activeExecutorRefreshWindowInTime,
@@ -1691,6 +1692,7 @@ public class ExecutorManager extends EventHandler implements
 
     public void setActive(boolean isActive) {
       this.isActive = isActive;
+      logger.info("QueueProcessorThread active turned " + this.isActive);
     }
 
     public boolean isActive() {
@@ -1714,7 +1716,7 @@ public class ExecutorManager extends EventHandler implements
             }
             wait(QUEUE_PROCESSOR_WAIT_IN_MS);
           } catch (Exception e) {
-            logger.info(
+            logger.error(
               "QueueProcessorThread Interrupted. Probably to shut down.", e);
           }
         }
@@ -1734,9 +1736,14 @@ public class ExecutorManager extends EventHandler implements
         ExecutableFlow exflow = runningCandidate.getSecond();
 
         long currentTime = System.currentTimeMillis();
+
+        // if we have dispatched more than maxContinuousFlowProcessed or
+        // It has been more then activeExecutorsRefreshWindow millisec since we
+        // refreshed
         if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
           || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
-          refreshExecutors(); // Refresh executor stats to be used by selector
+          // Refresh executorInfo for all activeExecutors
+          refreshExecutors();
           lastExecutorRefreshTime = currentTime;
           currentContinuousFlowProcessed = 0;
         }
@@ -1757,7 +1764,7 @@ public class ExecutorManager extends EventHandler implements
           try {
             dispatch(reference, exflow, selectedExecutor);
           } catch (ExecutorManagerException e) {
-            logger.debug(String.format(
+            logger.warn(String.format(
               "Executor %s responded with exception for exec: %d",
               selectedExecutor, exflow.getExecutionId()), e);
             handleDispatchExceptionCase(reference, exflow, selectedExecutor,
@@ -1781,6 +1788,11 @@ public class ExecutorManager extends EventHandler implements
     private void handleDispatchExceptionCase(ExecutionReference reference,
       ExecutableFlow exflow, Executor lastSelectedExecutor,
       Set<Executor> remainingExecutors) throws ExecutorManagerException {
+      logger
+        .info(String
+          .format(
+            "Reached handleDispatchExceptionCase stage for exec %d with error count %d",
+            exflow.getExecutionId(), reference.getNumErrors()));
       reference.setNumErrors(reference.getNumErrors() + 1);
       reference.setExecutor(null);
       if (reference.getNumErrors() >= MAX_DISPATCHING_ERRORS_PERMITTED
@@ -1796,6 +1808,11 @@ public class ExecutorManager extends EventHandler implements
 
     private void handleNoExecutorSelectedCase(ExecutionReference reference,
       ExecutableFlow exflow) throws ExecutorManagerException {
+      logger
+      .info(String
+        .format(
+          "Reached handleNoExecutorSelectedCase stage for exec %d with error count %d",
+          exflow.getExecutionId(), reference.getNumErrors()));
       reference.setNumErrors(reference.getNumErrors() + 1);
       // Scenario: when dispatcher didn't assigned any executor
       if (reference.getNumErrors() >= MAX_DISPATCHING_ERRORS_PERMITTED) {
@@ -1820,6 +1837,10 @@ public class ExecutorManager extends EventHandler implements
       // move from flow to running flows
       runningFlows.put(exflow.getExecutionId(),
         new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+
+      logger.info(String.format(
+        "Successfully dispatched exec %d with error count %d",
+        exflow.getExecutionId(), reference.getNumErrors()));
     }
   }
 }
\ No newline at end of file
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
index f9850d6..ed6543f 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
@@ -45,11 +45,12 @@ public class ExecutableFlowPriorityComparatorTest {
 
   /* Helper method to create an ExecutableFlow from serialized description */
   private ExecutableFlow createExecutableFlow(String flowName, int priority,
-    long updateTime) throws IOException {
+    long updateTime, int executionId) throws IOException {
     ExecutableFlow execFlow =
       TestUtils.createExecutableFlow("exectest1", flowName);
 
     execFlow.setUpdateTime(updateTime);
+    execFlow.setExecutionId(executionId);
     if (priority > 0) {
       execFlow.getExecutionOptions().getFlowParameters()
         .put(ExecutionOptions.FLOW_PRIORITY, String.valueOf(priority));
@@ -61,9 +62,9 @@ public class ExecutableFlowPriorityComparatorTest {
   @Test
   public void testExplicitlySpecifiedPriorities() throws IOException,
     InterruptedException {
-    ExecutableFlow flow1 = createExecutableFlow("exec1", 5, 3);
-    ExecutableFlow flow2 = createExecutableFlow("exec2", 6, 3);
-    ExecutableFlow flow3 = createExecutableFlow("exec3", 2, 3);
+    ExecutableFlow flow1 = createExecutableFlow("exec1", 5, 3, 1);
+    ExecutableFlow flow2 = createExecutableFlow("exec2", 6, 3, 2);
+    ExecutableFlow flow3 = createExecutableFlow("exec3", 2, 3, 3);
     ExecutionReference dummyRef = new ExecutionReference(0);
 
     BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
@@ -82,9 +83,9 @@ public class ExecutableFlowPriorityComparatorTest {
   @Test
   public void testMixedSpecifiedPriorities() throws IOException,
     InterruptedException {
-    ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 3);
-    ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 3);
-    ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 3);
+    ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 3, 1);
+    ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 3, 2);
+    ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 3, 3);
     ExecutionReference dummyRef = new ExecutionReference(0);
 
     BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
@@ -100,15 +101,43 @@ public class ExecutableFlowPriorityComparatorTest {
   }
 
   /*
-   * priority queue order when some priorities are equal, execution Id is used
-   * in this case
+   * priority queue order when some priorities are equal, updatetime is used in
+   * this case
    */
   @Test
   public void testEqualPriorities() throws IOException, InterruptedException {
-    ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 1);
-    ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 2);
-    ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 3);
-    ExecutableFlow flow4 = createExecutableFlow("exec3", 3, 4);
+    ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 1, 1);
+    ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 2, 2);
+    ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 3, 3);
+    ExecutableFlow flow4 = createExecutableFlow("exec3", 3, 4, 4);
+    ExecutionReference dummyRef = new ExecutionReference(0);
+
+    BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+      new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+        new ExecutableFlowPriorityComparator());
+
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow4));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+    Assert.assertEquals(flow3, queue.take().getSecond());
+    Assert.assertEquals(flow1, queue.take().getSecond());
+    Assert.assertEquals(flow4, queue.take().getSecond());
+    Assert.assertEquals(flow2, queue.take().getSecond());
+  }
+
+  /*
+   * priority queue order when some priorities and updatetime are equal,
+   * execution Id is used in this case
+   */
+  @Test
+  public void testEqualUpdateTimeAndPriority() throws IOException,
+    InterruptedException {
+    ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 1, 1);
+    ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 2, 2);
+    ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 2, 3);
+    ExecutableFlow flow4 = createExecutableFlow("exec3", 3, 4, 4);
     ExecutionReference dummyRef = new ExecutionReference(0);
 
     BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =