azkaban-aplcache

Fix bug: queued flows included in active flows (#2043) Bug

12/3/2018 3:58:15 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
index c8de86d..11ff709 100644
--- a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
@@ -68,7 +68,13 @@ public class FetchActiveFlowDao {
             + " Where ex.status NOT IN ("
             + Status.SUCCEEDED.getNumVal() + ", "
             + Status.KILLED.getNumVal() + ", "
-            + Status.FAILED.getNumVal() + ")";
+            + Status.FAILED.getNumVal() + ")"
+            // exclude queued flows that haven't been assigned yet -- this is the opposite of
+            // the condition in ExecutionFlowDao#FETCH_QUEUED_EXECUTABLE_FLOW
+            + " AND NOT ("
+            + "   ex.executor_id IS NULL"
+            + "   AND ex.status = " + Status.PREPARING.getNumVal()
+            + " )";
 
     @Override
     public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> handle(
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index 5819fbf..412907a 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -18,6 +18,7 @@ package azkaban.executor;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
 
 import azkaban.db.DatabaseOperator;
 import azkaban.project.JdbcProjectImpl;
@@ -29,6 +30,7 @@ import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.utils.TestUtils;
 import java.io.File;
+import java.io.IOException;
 import java.sql.SQLException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
@@ -307,34 +309,67 @@ public class ExecutionFlowDaoTest {
         .hasMessageContaining("non-existent execution");
   }
 
-
   @Test
   public void testFetchActiveFlowsExecutorAssigned() throws Exception {
 
-    // Upload flow1, executor assigned
-    final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
-    this.executionFlowDao.uploadExecutableFlow(flow1);
     final Executor executor = this.executorDao.addExecutor("test", 1);
-    this.assignExecutor.assignExecutor(executor.getId(), flow1.getExecutionId());
 
-    // Upload flow2, executor not assigned
-    final ExecutableFlow flow2 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
-    this.executionFlowDao.uploadExecutableFlow(flow2);
+    final ExecutableFlow flow1 = createExecutionAndAssign(Status.PREPARING, executor);
+    // flow2 is not assigned
+    final ExecutableFlow flow2 = createExecution(Status.PREPARING);
+    final ExecutableFlow flow3 = createExecutionAndAssign(Status.RUNNING, executor);
+    final ExecutableFlow flow4 = createExecutionAndAssign(Status.SUCCEEDED, executor);
 
-    final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
+    final Executor executor2 = this.executorDao.addExecutor("test2", 2);
+    // flow5 is assigned to an executor that is then removed
+    final ExecutableFlow flow5 = createExecutionAndAssign(Status.RUNNING, executor2);
+    this.executorDao.removeExecutor(executor2.getHost(), executor2.getPort());
+
+    final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows =
         this.fetchActiveFlowDao.fetchActiveFlows();
 
-    assertThat(activeFlows1.containsKey(flow1.getExecutionId())).isTrue();
-    assertThat(activeFlows1.get(flow1.getExecutionId()).getFirst().getExecutor().isPresent())
-        .isTrue();
-    assertThat(activeFlows1.containsKey(flow2.getExecutionId())).isTrue();
-    assertThat(activeFlows1.get(flow2.getExecutionId()).getFirst().getExecutor().isPresent())
-        .isFalse();
+    assertFound(activeFlows, flow1, true);
+    assertNotFound(activeFlows, flow2, "Returned a queued execution");
+    assertFound(activeFlows, flow3, true);
+    assertNotFound(activeFlows, flow4, "Returned an execution with a finished status");
+    assertFound(activeFlows, flow5, false);
+
     final ExecutableFlow flow1Result =
-        activeFlows1.get(flow1.getExecutionId()).getSecond();
+        activeFlows.get(flow1.getExecutionId()).getSecond();
     assertTwoFlowSame(flow1Result, flow1);
   }
 
+  private void assertNotFound(
+      final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows,
+      final ExecutableFlow flow, final String failMessage) {
+    assertThat(activeFlows.containsKey(flow.getExecutionId())).withFailMessage(failMessage)
+        .isFalse();
+  }
+
+  private void assertFound(final Map<Integer, Pair<ExecutionReference,
+      ExecutableFlow>> activeFlows, final ExecutableFlow flow, final boolean executorPresent) {
+    assertThat(activeFlows.containsKey(flow.getExecutionId())).isTrue();
+    assertThat(activeFlows.get(flow.getExecutionId()).getFirst().getExecutor().isPresent())
+        .isEqualTo(executorPresent);
+  }
+
+  private ExecutableFlow createExecutionAndAssign(final Status status, final Executor executor)
+      throws Exception {
+    final ExecutableFlow flow = createExecution(status);
+    this.assignExecutor.assignExecutor(executor.getId(), flow.getExecutionId());
+    return flow;
+  }
+
+  private ExecutableFlow createExecution(final Status status)
+      throws IOException, ExecutorManagerException {
+    final ExecutableFlow flow = TestUtils.createTestExecutableFlow("exectest1", "exec1");
+    this.executionFlowDao.uploadExecutableFlow(flow);
+    assertEquals(Status.PREPARING, flow.getStatus());
+    flow.setStatus(status);
+    this.executionFlowDao.updateExecutableFlow(flow);
+    return flow;
+  }
+
   @Test
   public void testFetchActiveFlowsStatusChanged() throws Exception {
     final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
diff --git a/azkaban-common/src/test/java/azkaban/executor/FetchActiveFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/FetchActiveFlowDaoTest.java
index 3093b97..1307a8f 100644
--- a/azkaban-common/src/test/java/azkaban/executor/FetchActiveFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/FetchActiveFlowDaoTest.java
@@ -15,6 +15,9 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+/**
+ * Also @see ExecutionFlowDaoTest - DB operations of FetchActiveFlowDao are tested there.
+ */
 public class FetchActiveFlowDaoTest {
 
   private ResultSet rs;