diff --git a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
index c8de86d..11ff709 100644
--- a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
@@ -68,7 +68,13 @@ public class FetchActiveFlowDao {
+ " Where ex.status NOT IN ("
+ Status.SUCCEEDED.getNumVal() + ", "
+ Status.KILLED.getNumVal() + ", "
- + Status.FAILED.getNumVal() + ")";
+ + Status.FAILED.getNumVal() + ")"
+ // exclude queued flows that haven't been assigned yet -- this is the opposite of
+ // the condition in ExecutionFlowDao#FETCH_QUEUED_EXECUTABLE_FLOW
+ + " AND NOT ("
+ + " ex.executor_id IS NULL"
+ + " AND ex.status = " + Status.PREPARING.getNumVal()
+ + " )";
@Override
public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> handle(
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index 5819fbf..412907a 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -18,6 +18,7 @@ package azkaban.executor;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
import azkaban.db.DatabaseOperator;
import azkaban.project.JdbcProjectImpl;
@@ -29,6 +30,7 @@ import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.TestUtils;
import java.io.File;
+import java.io.IOException;
import java.sql.SQLException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
@@ -307,34 +309,67 @@ public class ExecutionFlowDaoTest {
.hasMessageContaining("non-existent execution");
}
-
@Test
public void testFetchActiveFlowsExecutorAssigned() throws Exception {
- // Upload flow1, executor assigned
- final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
- this.executionFlowDao.uploadExecutableFlow(flow1);
final Executor executor = this.executorDao.addExecutor("test", 1);
- this.assignExecutor.assignExecutor(executor.getId(), flow1.getExecutionId());
- // Upload flow2, executor not assigned
- final ExecutableFlow flow2 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
- this.executionFlowDao.uploadExecutableFlow(flow2);
+ final ExecutableFlow flow1 = createExecutionAndAssign(Status.PREPARING, executor);
+ // flow2 is not assigned
+ final ExecutableFlow flow2 = createExecution(Status.PREPARING);
+ final ExecutableFlow flow3 = createExecutionAndAssign(Status.RUNNING, executor);
+ final ExecutableFlow flow4 = createExecutionAndAssign(Status.SUCCEEDED, executor);
- final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
+ final Executor executor2 = this.executorDao.addExecutor("test2", 2);
+ // flow5 is assigned to an executor that is then removed
+ final ExecutableFlow flow5 = createExecutionAndAssign(Status.RUNNING, executor2);
+ this.executorDao.removeExecutor(executor2.getHost(), executor2.getPort());
+
+ final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows =
this.fetchActiveFlowDao.fetchActiveFlows();
- assertThat(activeFlows1.containsKey(flow1.getExecutionId())).isTrue();
- assertThat(activeFlows1.get(flow1.getExecutionId()).getFirst().getExecutor().isPresent())
- .isTrue();
- assertThat(activeFlows1.containsKey(flow2.getExecutionId())).isTrue();
- assertThat(activeFlows1.get(flow2.getExecutionId()).getFirst().getExecutor().isPresent())
- .isFalse();
+ assertFound(activeFlows, flow1, true);
+ assertNotFound(activeFlows, flow2, "Returned a queued execution");
+ assertFound(activeFlows, flow3, true);
+ assertNotFound(activeFlows, flow4, "Returned an execution with a finished status");
+ assertFound(activeFlows, flow5, false);
+
final ExecutableFlow flow1Result =
- activeFlows1.get(flow1.getExecutionId()).getSecond();
+ activeFlows.get(flow1.getExecutionId()).getSecond();
assertTwoFlowSame(flow1Result, flow1);
}
+ private void assertNotFound(
+ final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows,
+ final ExecutableFlow flow, final String failMessage) {
+ assertThat(activeFlows.containsKey(flow.getExecutionId())).withFailMessage(failMessage)
+ .isFalse();
+ }
+
+ private void assertFound(final Map<Integer, Pair<ExecutionReference,
+ ExecutableFlow>> activeFlows, final ExecutableFlow flow, final boolean executorPresent) {
+ assertThat(activeFlows.containsKey(flow.getExecutionId())).isTrue();
+ assertThat(activeFlows.get(flow.getExecutionId()).getFirst().getExecutor().isPresent())
+ .isEqualTo(executorPresent);
+ }
+
+ private ExecutableFlow createExecutionAndAssign(final Status status, final Executor executor)
+ throws Exception {
+ final ExecutableFlow flow = createExecution(status);
+ this.assignExecutor.assignExecutor(executor.getId(), flow.getExecutionId());
+ return flow;
+ }
+
+ private ExecutableFlow createExecution(final Status status)
+ throws IOException, ExecutorManagerException {
+ final ExecutableFlow flow = TestUtils.createTestExecutableFlow("exectest1", "exec1");
+ this.executionFlowDao.uploadExecutableFlow(flow);
+ assertEquals(Status.PREPARING, flow.getStatus());
+ flow.setStatus(status);
+ this.executionFlowDao.updateExecutableFlow(flow);
+ return flow;
+ }
+
@Test
public void testFetchActiveFlowsStatusChanged() throws Exception {
final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");