diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java
index eb3f834..3050a8d 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java
@@ -22,11 +22,26 @@ import org.apache.log4j.Logger;
import azkaban.utils.Pair;
+/**
+ * Comparator implicitly used in priority queue for QueuedExecutions.
+ */
public final class ExecutableFlowPriorityComparator implements
Comparator<Pair<ExecutionReference, ExecutableFlow>> {
private static Logger logger = Logger
.getLogger(ExecutableFlowPriorityComparator.class);
+ /**
+ * <pre>
+ * Sorting order is determined by:-
+ * 1. descending order of priority
+ * 2. if same priority, ascending order of update time
+ * 3. if same priority and updateTime, ascending order of execution id
+ * </pre>
+ *
+ * {@inheritDoc}
+ *
+ * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
+ */
@Override
public int compare(Pair<ExecutionReference, ExecutableFlow> pair1,
Pair<ExecutionReference, ExecutableFlow> pair2) {
@@ -47,11 +62,11 @@ public final class ExecutableFlowPriorityComparator implements
// descending order of priority
int diff = getPriority(exflow2) - getPriority(exflow1);
if (diff == 0) {
- // increasing order of update time, if same priority
+ // ascending order of update time, if same priority
diff = (int) (exflow1.getUpdateTime() - exflow2.getUpdateTime());
}
if (diff == 0) {
- // increasing order of execution id, if same priority and updateTime
+ // ascending order of execution id, if same priority and updateTime
diff = exflow1.getExecutionId() - exflow2.getExecutionId();
}
return diff;
@@ -72,8 +87,9 @@ public final class ExecutableFlowPriorityComparator implements
ExecutionOptions.FLOW_PRIORITY));
} catch (NumberFormatException ex) {
priority = ExecutionOptions.DEFAULT_FLOW_PRIORITY;
- logger.error("Failed to parse flow priority for exec_id = "
- + exflow.getExecutionId());
+ logger.error(
+ "Failed to parse flow priority for exec_id = "
+ + exflow.getExecutionId(), ex);
}
}
return priority;
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index ab9859e..c97ef00 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -1667,7 +1667,8 @@ public class ExecutorManager extends EventHandler implements
}
/*
- * This thread is responsible for processing queued flows.
+ * This thread is responsible for processing queued flows using dispatcher and
+ * making rest api calls to executor server
*/
private class QueueProcessorThread extends Thread {
private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
@@ -1675,8 +1676,8 @@ public class ExecutorManager extends EventHandler implements
private final long activeExecutorRefreshWindowInMilisec;
private final int activeExecutorRefreshWindowInFlows;
- private boolean shutdown = false;
- private boolean isActive = true;
+ private volatile boolean shutdown = false;
+ private volatile boolean isActive = true;
public QueueProcessorThread(boolean isActive,
long activeExecutorRefreshWindowInTime,
@@ -1691,6 +1692,7 @@ public class ExecutorManager extends EventHandler implements
public void setActive(boolean isActive) {
this.isActive = isActive;
+ logger.info("QueueProcessorThread active turned " + this.isActive);
}
public boolean isActive() {
@@ -1714,7 +1716,7 @@ public class ExecutorManager extends EventHandler implements
}
wait(QUEUE_PROCESSOR_WAIT_IN_MS);
} catch (Exception e) {
- logger.info(
+ logger.error(
"QueueProcessorThread Interrupted. Probably to shut down.", e);
}
}
@@ -1734,9 +1736,14 @@ public class ExecutorManager extends EventHandler implements
ExecutableFlow exflow = runningCandidate.getSecond();
long currentTime = System.currentTimeMillis();
+
+ // if we have dispatched more than maxContinuousFlowProcessed or
+ // It has been more then activeExecutorsRefreshWindow millisec since we
+ // refreshed
if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
|| currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
- refreshExecutors(); // Refresh executor stats to be used by selector
+ // Refresh executorInfo for all activeExecutors
+ refreshExecutors();
lastExecutorRefreshTime = currentTime;
currentContinuousFlowProcessed = 0;
}
@@ -1757,7 +1764,7 @@ public class ExecutorManager extends EventHandler implements
try {
dispatch(reference, exflow, selectedExecutor);
} catch (ExecutorManagerException e) {
- logger.debug(String.format(
+ logger.warn(String.format(
"Executor %s responded with exception for exec: %d",
selectedExecutor, exflow.getExecutionId()), e);
handleDispatchExceptionCase(reference, exflow, selectedExecutor,
@@ -1781,6 +1788,11 @@ public class ExecutorManager extends EventHandler implements
private void handleDispatchExceptionCase(ExecutionReference reference,
ExecutableFlow exflow, Executor lastSelectedExecutor,
Set<Executor> remainingExecutors) throws ExecutorManagerException {
+ logger
+ .info(String
+ .format(
+ "Reached handleDispatchExceptionCase stage for exec %d with error count %d",
+ exflow.getExecutionId(), reference.getNumErrors()));
reference.setNumErrors(reference.getNumErrors() + 1);
reference.setExecutor(null);
if (reference.getNumErrors() >= MAX_DISPATCHING_ERRORS_PERMITTED
@@ -1796,6 +1808,11 @@ public class ExecutorManager extends EventHandler implements
private void handleNoExecutorSelectedCase(ExecutionReference reference,
ExecutableFlow exflow) throws ExecutorManagerException {
+ logger
+ .info(String
+ .format(
+ "Reached handleNoExecutorSelectedCase stage for exec %d with error count %d",
+ exflow.getExecutionId(), reference.getNumErrors()));
reference.setNumErrors(reference.getNumErrors() + 1);
// Scenario: when dispatcher didn't assigned any executor
if (reference.getNumErrors() >= MAX_DISPATCHING_ERRORS_PERMITTED) {
@@ -1820,6 +1837,10 @@ public class ExecutorManager extends EventHandler implements
// move from flow to running flows
runningFlows.put(exflow.getExecutionId(),
new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+
+ logger.info(String.format(
+ "Successfully dispatched exec %d with error count %d",
+ exflow.getExecutionId(), reference.getNumErrors()));
}
}
}
\ No newline at end of file
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
index f9850d6..ed6543f 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
@@ -45,11 +45,12 @@ public class ExecutableFlowPriorityComparatorTest {
/* Helper method to create an ExecutableFlow from serialized description */
private ExecutableFlow createExecutableFlow(String flowName, int priority,
- long updateTime) throws IOException {
+ long updateTime, int executionId) throws IOException {
ExecutableFlow execFlow =
TestUtils.createExecutableFlow("exectest1", flowName);
execFlow.setUpdateTime(updateTime);
+ execFlow.setExecutionId(executionId);
if (priority > 0) {
execFlow.getExecutionOptions().getFlowParameters()
.put(ExecutionOptions.FLOW_PRIORITY, String.valueOf(priority));
@@ -61,9 +62,9 @@ public class ExecutableFlowPriorityComparatorTest {
@Test
public void testExplicitlySpecifiedPriorities() throws IOException,
InterruptedException {
- ExecutableFlow flow1 = createExecutableFlow("exec1", 5, 3);
- ExecutableFlow flow2 = createExecutableFlow("exec2", 6, 3);
- ExecutableFlow flow3 = createExecutableFlow("exec3", 2, 3);
+ ExecutableFlow flow1 = createExecutableFlow("exec1", 5, 3, 1);
+ ExecutableFlow flow2 = createExecutableFlow("exec2", 6, 3, 2);
+ ExecutableFlow flow3 = createExecutableFlow("exec3", 2, 3, 3);
ExecutionReference dummyRef = new ExecutionReference(0);
BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
@@ -82,9 +83,9 @@ public class ExecutableFlowPriorityComparatorTest {
@Test
public void testMixedSpecifiedPriorities() throws IOException,
InterruptedException {
- ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 3);
- ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 3);
- ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 3);
+ ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 3, 1);
+ ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 3, 2);
+ ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 3, 3);
ExecutionReference dummyRef = new ExecutionReference(0);
BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
@@ -100,15 +101,43 @@ public class ExecutableFlowPriorityComparatorTest {
}
/*
- * priority queue order when some priorities are equal, execution Id is used
- * in this case
+ * priority queue order when some priorities are equal, updatetime is used in
+ * this case
*/
@Test
public void testEqualPriorities() throws IOException, InterruptedException {
- ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 1);
- ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 2);
- ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 3);
- ExecutableFlow flow4 = createExecutableFlow("exec3", 3, 4);
+ ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 1, 1);
+ ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 2, 2);
+ ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 3, 3);
+ ExecutableFlow flow4 = createExecutableFlow("exec3", 3, 4, 4);
+ ExecutionReference dummyRef = new ExecutionReference(0);
+
+ BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+ new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+ new ExecutableFlowPriorityComparator());
+
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow4));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+ Assert.assertEquals(flow3, queue.take().getSecond());
+ Assert.assertEquals(flow1, queue.take().getSecond());
+ Assert.assertEquals(flow4, queue.take().getSecond());
+ Assert.assertEquals(flow2, queue.take().getSecond());
+ }
+
+ /*
+ * priority queue order when some priorities and updatetime are equal,
+ * execution Id is used in this case
+ */
+ @Test
+ public void testEqualUpdateTimeAndPriority() throws IOException,
+ InterruptedException {
+ ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 1, 1);
+ ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 2, 2);
+ ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 2, 3);
+ ExecutableFlow flow4 = createExecutableFlow("exec3", 3, 4, 4);
ExecutionReference dummyRef = new ExecutionReference(0);
BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =