azkaban-aplcache

[DO NOT MERGE UNTIL DB IS UPDATED] Fetch active/queued flows

6/6/2017 6:18:45 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
index c0bf5c3..2a26c1a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
@@ -20,6 +20,7 @@ public class ExecutionReference {
 
   private final int execId;
   private Executor executor;
+  //Todo jamiesjc: deprecate updateTime in ExecutionReference class gradually.
   private long updateTime;
   private long nextCheckTime = -1;
   private int numErrors = 0;
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index b1e315b..a3f761f 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -1034,8 +1034,11 @@ public class ExecutorManager extends EventHandler implements
           try {
             dispatch(reference, exflow, choosenExecutor);
           } catch (ExecutorManagerException e) {
-            executorLoader.removeActiveExecutableReference(reference
-              .getExecId());
+            // When flow dispatch fails, should update the flow status
+            // to FAILED in execution_flows DB table as well. Currently
+            // this logic is only implemented in multiExecutorMode but
+            // missed in single executor case.
+            finalizeFlows(exflow);
             throw e;
           }
         }
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 4bc8e68..1f31a0a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -1331,27 +1331,23 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
     // Select queued unassigned flows
     private static String FETCH_QUEUED_EXECUTABLE_FLOW =
-      "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, "
-        + " ax.update_time axUpdateTime FROM execution_flows ex"
-        + " INNER JOIN"
-        + " active_executing_flows ax ON ex.exec_id = ax.exec_id"
-        + " Where ex.executor_id is NULL";
+        "SELECT exec_id, enc_type, flow_data FROM execution_flows"
+            + " Where executor_id is NULL AND status = "
+            + Status.PREPARING.getNumVal();
 
     @Override
     public List<Pair<ExecutionReference, ExecutableFlow>> handle(ResultSet rs)
       throws SQLException {
       if (!rs.next()) {
-        return Collections
-          .<Pair<ExecutionReference, ExecutableFlow>> emptyList();
+        return Collections.emptyList();
       }
 
       List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
-        new ArrayList<Pair<ExecutionReference, ExecutableFlow>>();
+        new ArrayList<>();
       do {
         int id = rs.getInt(1);
         int encodingType = rs.getInt(2);
         byte[] data = rs.getBytes(3);
-        long updateTime = rs.getLong(4);
 
         if (data == null) {
           logger.error("Found a flow with empty data blob exec_id: " + id);
@@ -1373,10 +1369,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
             ExecutableFlow exFlow =
               ExecutableFlow.createExecutableFlowFromObject(flowObj);
             ExecutionReference ref = new ExecutionReference(id);
-            ref.setUpdateTime(updateTime);
-
-            execFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref,
-              exFlow));
+            execFlows.add(new Pair<>(ref, exFlow));
           } catch (IOException e) {
             throw new SQLException("Error retrieving flow data " + id, e);
           }
@@ -1438,19 +1431,20 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     // Select running and executor assigned flows
     private static String FETCH_ACTIVE_EXECUTABLE_FLOW =
       "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
-        + "et.port port, ax.update_time axUpdateTime, et.id executorId, et.active executorStatus"
+        + "et.port port, et.id executorId, et.active executorStatus"
         + " FROM execution_flows ex"
         + " INNER JOIN "
-        + " active_executing_flows ax ON ex.exec_id = ax.exec_id"
-        + " INNER JOIN "
-        + " executors et ON ex.executor_id = et.id";
+        + " executors et ON ex.executor_id = et.id"
+        + " Where ex.status NOT IN ("
+        + Status.SUCCEEDED.getNumVal() + ", "
+        + Status.KILLED.getNumVal() + ", "
+        + Status.FAILED.getNumVal() + ")";
 
     @Override
     public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> handle(
         ResultSet rs) throws SQLException {
       if (!rs.next()) {
-        return Collections
-            .<Integer, Pair<ExecutionReference, ExecutableFlow>> emptyMap();
+        return Collections.emptyMap();
       }
 
       Map<Integer, Pair<ExecutionReference, ExecutableFlow>> execFlows =
@@ -1461,9 +1455,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
         byte[] data = rs.getBytes(3);
         String host = rs.getString(4);
         int port = rs.getInt(5);
-        long updateTime = rs.getLong(6);
-        int executorId = rs.getInt(7);
-        boolean executorStatus = rs.getBoolean(8);
+        int executorId = rs.getInt(6);
+        boolean executorStatus = rs.getBoolean(7);
 
         if (data == null) {
           execFlows.put(id, null);
@@ -1486,10 +1479,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
                 ExecutableFlow.createExecutableFlowFromObject(flowObj);
             Executor executor = new Executor(executorId, host, port, executorStatus);
             ExecutionReference ref = new ExecutionReference(id, executor);
-            ref.setUpdateTime(updateTime);
-
-            execFlows.put(id, new Pair<ExecutionReference, ExecutableFlow>(ref,
-                exFlow));
+            execFlows.put(id, new Pair<>(ref, exFlow));
           } catch (IOException e) {
             throw new SQLException("Error retrieving flow data " + id, e);
           }
@@ -1504,13 +1494,14 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
     private static String FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID =
         "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
-            + "et.port port, ax.update_time axUpdateTime, et.id executorId, et.active executorStatus"
+            + "et.port port, et.id executorId, et.active executorStatus"
             + " FROM execution_flows ex"
             + " INNER JOIN "
-            + " active_executing_flows ax ON ex.exec_id = ax.exec_id"
-            + " INNER JOIN "
             + " executors et ON ex.executor_id = et.id"
-            + " WHERE ax.exec_id = ?";
+            + " Where ex.exec_id = ? AND ex.status NOT IN ("
+            + Status.SUCCEEDED.getNumVal() + ", "
+            + Status.KILLED.getNumVal() + ", "
+            + Status.FAILED.getNumVal() + ")";
 
     @Override
     public List<Pair<ExecutionReference, ExecutableFlow>> handle(ResultSet rs)
@@ -1527,9 +1518,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
         byte[] data = rs.getBytes(3);
         String host = rs.getString(4);
         int port = rs.getInt(5);
-        long updateTime = rs.getLong(6);
-        int executorId = rs.getInt(7);
-        boolean executorStatus = rs.getBoolean(8);
+        int executorId = rs.getInt(6);
+        boolean executorStatus = rs.getBoolean(7);
 
         if (data == null) {
           logger.error("Found a flow with empty data blob exec_id: " + id);
@@ -1549,8 +1539,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
                 ExecutableFlow.createExecutableFlowFromObject(flowObj);
             Executor executor = new Executor(executorId, host, port, executorStatus);
             ExecutionReference ref = new ExecutionReference(id, executor);
-            ref.setUpdateTime(updateTime);
-
             execFlows.add(new Pair<>(ref, exFlow));
           } catch (IOException e) {
             throw new SQLException("Error retrieving flow data " + id, e);
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index ea0d43d..6c1138a 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -471,18 +471,17 @@ public class JdbcExecutorLoaderTest {
     int port = 12345;
     Executor executor = loader.addExecutor(host, port);
 
+    // When a flow is assigned an executor, it is no longer in queued state
     ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow);
     loader.assignExecutor(executor.getId(), flow.getExecutionId());
-    // only completed flows
     Assert.assertTrue(queuedFlows.isEmpty());
 
+    // When flow status is finished, it is no longer in queued state
     ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
-    loader.uploadExecutableFlow(flow);
-    loader.assignExecutor(executor.getId(), flow.getExecutionId());
-    ExecutionReference ref = new ExecutionReference(flow2.getExecutionId());
-    loader.addActiveExecutableReference(ref);
-    // only running and completed flows
+    loader.uploadExecutableFlow(flow2);
+    flow2.setStatus(Status.SUCCEEDED);
+    loader.updateExecutableFlow(flow2);
     Assert.assertTrue(queuedFlows.isEmpty());
   }
 
@@ -495,29 +494,20 @@ public class JdbcExecutorLoaderTest {
     }
 
     ExecutorLoader loader = createLoader();
-    List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
-      new LinkedList<Pair<ExecutionReference, ExecutableFlow>>();
 
     ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow);
     ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
     loader.uploadExecutableFlow(flow2);
 
-    ExecutionReference ref2 = new ExecutionReference(flow2.getExecutionId());
-    loader.addActiveExecutableReference(ref2);
-    ExecutionReference ref = new ExecutionReference(flow.getExecutionId());
-    loader.addActiveExecutableReference(ref);
-
     List<Pair<ExecutionReference, ExecutableFlow>> fetchedQueuedFlows = loader.fetchQueuedFlows();
     Assert.assertEquals(2, fetchedQueuedFlows.size());
     Pair<ExecutionReference, ExecutableFlow> fetchedFlow1 = fetchedQueuedFlows.get(0);
     Pair<ExecutionReference, ExecutableFlow> fetchedFlow2 = fetchedQueuedFlows.get(1);
 
-    Assert.assertEquals(ref.getExecId(), fetchedFlow1.getFirst().getExecId());
     Assert.assertEquals(flow.getExecutionId(), fetchedFlow1.getSecond().getExecutionId());
     Assert.assertEquals(flow.getFlowId(), fetchedFlow1.getSecond().getFlowId());
     Assert.assertEquals(flow.getProjectId(), fetchedFlow1.getSecond().getProjectId());
-    Assert.assertEquals(ref2.getExecId(), fetchedFlow2.getFirst().getExecId());
     Assert.assertEquals(flow2.getExecutionId(), fetchedFlow2.getSecond().getExecutionId());
     Assert.assertEquals(flow2.getFlowId(), fetchedFlow2.getSecond().getFlowId());
     Assert.assertEquals(flow2.getProjectId(), fetchedFlow2.getSecond().getProjectId());
@@ -785,32 +775,28 @@ public class JdbcExecutorLoaderTest {
   }
 
   @Test
-  public void testActiveReference() throws Exception {
+  public void testFetchActiveFlowsExecutorAssigned() throws Exception {
     if (!isTestSetup()) {
       return;
     }
 
+    // Upload flow1, executor assigned
     ExecutorLoader loader = createLoader();
     ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow1);
     Executor executor = loader.addExecutor("test", 1);
     loader.assignExecutor(executor.getId(), flow1.getExecutionId());
-    ExecutionReference ref1 =
-        new ExecutionReference(flow1.getExecutionId(), executor);
-    loader.addActiveExecutableReference(ref1);
 
-    ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    // Upload flow2, executor not assigned
+    ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
     loader.uploadExecutableFlow(flow2);
-    loader.assignExecutor(executor.getId(), flow2.getExecutionId());
-    ExecutionReference ref2 =
-        new ExecutionReference(flow2.getExecutionId(), executor);
-    loader.addActiveExecutableReference(ref2);
-
-    ExecutableFlow flow3 = TestUtils.createExecutableFlow("exectest1", "exec1");
-    loader.uploadExecutableFlow(flow3);
 
     Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
         loader.fetchActiveFlows();
+
+    Assert.assertTrue(activeFlows1.containsKey(flow1.getExecutionId()));
+    Assert.assertFalse(activeFlows1.containsKey(flow2.getExecutionId()));
+
     ExecutableFlow flow1Result =
         activeFlows1.get(flow1.getExecutionId()).getSecond();
     Assert.assertNotNull(flow1Result);
@@ -824,28 +810,64 @@ public class JdbcExecutorLoaderTest {
     Assert.assertEquals(flow1.getVersion(), flow1Result.getVersion());
     Assert.assertEquals(flow1.getExecutionOptions().getFailureAction(),
         flow1Result.getExecutionOptions().getFailureAction());
+  }
+
+  @Test
+  public void testFetchActiveFlowsStatusChanged() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+
+    ExecutorLoader loader = createLoader();
+    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    // Flow status is PREPARING when uploaded, should be in active flows
+    loader.uploadExecutableFlow(flow1);
+    Executor executor = loader.addExecutor("test", 1);
+    loader.assignExecutor(executor.getId(), flow1.getExecutionId());
+
+    Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows =
+        loader.fetchActiveFlows();
+    Assert.assertTrue(activeFlows.containsKey(flow1.getExecutionId()));
+
+    // When flow status becomes SUCCEEDED/KILLED/FAILED, it should not be in active state
+    flow1.setStatus(Status.SUCCEEDED);
+    loader.updateExecutableFlow(flow1);
+    activeFlows = loader.fetchActiveFlows();
+    Assert.assertFalse(activeFlows.containsKey(flow1.getExecutionId()));
 
-    ExecutableFlow flow1Result2 =
-        activeFlows1.get(flow2.getExecutionId()).getSecond();
-    Assert.assertNotNull(flow1Result2);
-    Assert.assertTrue(flow2 != flow1Result2);
-    Assert.assertEquals(flow2.getExecutionId(), flow1Result2.getExecutionId());
-    Assert.assertEquals(flow2.getEndTime(), flow1Result2.getEndTime());
-    Assert.assertEquals(flow2.getStartTime(), flow1Result2.getStartTime());
-    Assert.assertEquals(flow2.getSubmitTime(), flow1Result2.getSubmitTime());
-    Assert.assertEquals(flow2.getFlowId(), flow1Result2.getFlowId());
-    Assert.assertEquals(flow2.getProjectId(), flow1Result2.getProjectId());
-    Assert.assertEquals(flow2.getVersion(), flow1Result2.getVersion());
-    Assert.assertEquals(flow2.getExecutionOptions().getFailureAction(),
-        flow1Result2.getExecutionOptions().getFailureAction());
-
-    loader.removeActiveExecutableReference(flow2.getExecutionId());
-    Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows2 =
+    flow1.setStatus(Status.KILLED);
+    loader.updateExecutableFlow(flow1);
+    activeFlows = loader.fetchActiveFlows();
+    Assert.assertFalse(activeFlows.containsKey(flow1.getExecutionId()));
+
+    flow1.setStatus(Status.FAILED);
+    loader.updateExecutableFlow(flow1);
+    activeFlows = loader.fetchActiveFlows();
+    Assert.assertFalse(activeFlows.containsKey(flow1.getExecutionId()));
+  }
+
+  @Test
+  public void testFetchActiveFlowsReferenceChanged() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+
+    ExecutorLoader loader = createLoader();
+    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    loader.uploadExecutableFlow(flow1);
+    Executor executor = loader.addExecutor("test", 1);
+    loader.assignExecutor(executor.getId(), flow1.getExecutionId());
+    ExecutionReference ref1 =
+        new ExecutionReference(flow1.getExecutionId(), executor);
+    loader.addActiveExecutableReference(ref1);
+
+    Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
         loader.fetchActiveFlows();
+    Assert.assertTrue(activeFlows1.containsKey(flow1.getExecutionId()));
 
-    Assert.assertTrue(activeFlows2.containsKey(flow1.getExecutionId()));
-    Assert.assertFalse(activeFlows2.containsKey(flow3.getExecutionId()));
-    Assert.assertFalse(activeFlows2.containsKey(flow2.getExecutionId()));
+    // Verify active flows are not fetched from active_executing_flows DB table any more
+    loader.removeActiveExecutableReference(flow1.getExecutionId());
+    Assert.assertTrue(activeFlows1.containsKey(flow1.getExecutionId()));
   }
 
   @Test
@@ -859,8 +881,6 @@ public class JdbcExecutorLoaderTest {
     loader.uploadExecutableFlow(flow1);
     Executor executor = loader.addExecutor("test", 1);
     loader.assignExecutor(executor.getId(), flow1.getExecutionId());
-    ExecutionReference ref1 = new ExecutionReference(flow1.getExecutionId(), executor);
-    loader.addActiveExecutableReference(ref1);
 
     Pair<ExecutionReference, ExecutableFlow> activeFlow1 =
         loader.fetchActiveFlowByExecId(flow1.getExecutionId());
@@ -868,8 +888,6 @@ public class JdbcExecutorLoaderTest {
     ExecutionReference execRef1 = activeFlow1.getFirst();
     ExecutableFlow execFlow1 = activeFlow1.getSecond();
     Assert.assertNotNull(execRef1);
-    Assert.assertEquals(ref1.getExecId(), execRef1.getExecId());
-    Assert.assertEquals(ref1.getExecutor(), execRef1.getExecutor());
     Assert.assertNotNull(execFlow1);
     Assert.assertEquals(flow1.getExecutionId(), execFlow1.getExecutionId());
     Assert.assertEquals(flow1.getFlowId(), execFlow1.getFlowId());