azkaban-aplcache

exclude trigger instances in cancelling status from recently

2/21/2018 10:43:25 PM

Details

diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
index 91cc7a8..f744c47 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
@@ -95,16 +95,19 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
               Status.SUCCEEDED.ordinal(),
               Constants.UNASSIGNED_EXEC_ID);
 
-  private static final String SELECT_RECENTLY_FINISHED =
+  private static final String SELECT_RECENTLY_FINISHED = String.format(
       "SELECT execution_dependencies.trigger_instance_id,dep_name,starttime,endtime,dep_status,"
-          + "cancelleation_cause,project_id,project_version,flow_id,flow_version,"
-          + "project_json, flow_exec_id \n"
+          + "cancelleation_cause,project_id,"
+          + "project_version,flow_id,flow_version,project_json, flow_exec_id \n"
           + "FROM execution_dependencies JOIN (\n"
-          + "SELECT trigger_instance_id FROM execution_dependencies WHERE trigger_instance_id not in (\n"
-          + "SELECT distinct(trigger_instance_id)  FROM execution_dependencies WHERE dep_status =  0 or dep_status = 4)\n"
+          + "SELECT distinct(trigger_instance_id)  FROM execution_dependencies WHERE dep_status ="
+          + " %s or dep_status = %s\n"
           + "GROUP BY trigger_instance_id\n"
-          + "ORDER BY  min(starttime) desc limit %s) temp on execution_dependencies"
-          + ".trigger_instance_id in (temp.trigger_instance_id);";
+          + " limit %%s) temp on execution_dependencies"
+          + ".trigger_instance_id in (temp.trigger_instance_id);",
+      Status.SUCCEEDED.ordinal(),
+      Status.CANCELLED.ordinal());
+
 
   private static final String UPDATE_DEPENDENCY_FLOW_EXEC_ID = String.format("UPDATE %s SET "
       + "flow_exec_id "
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
index 377be3f..f72f63b 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
@@ -36,6 +36,7 @@ import java.io.File;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.Iterator;
@@ -209,6 +210,18 @@ public class FlowTriggerInstanceLoaderTest {
     }
   }
 
+  private void finalizeTriggerInstanceWithCancelling(final TriggerInstance triggerInst) {
+    for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
+      depInst.setStatus(Status.CANCELLING);
+    }
+  }
+
+  private void shuffleAndUpload(final List<TriggerInstance> all) {
+    final List<TriggerInstance> shuffled = new ArrayList<>(all);
+    Collections.shuffle(shuffled);
+    shuffled.forEach(triggerInst -> this.triggerInstLoader.uploadTriggerInstance(triggerInst));
+  }
+
   @Test
   public void testGetIncompleteTriggerInstancesReturnsEmpty() {
     final List<TriggerInstance> all = new ArrayList<>();
@@ -221,9 +234,7 @@ public class FlowTriggerInstanceLoaderTest {
         finalizeTriggerInstanceWithSuccess(all.get(i), 1000);
       }
     }
-
-    all.forEach(triggerInst -> this.triggerInstLoader.uploadTriggerInstance(triggerInst));
-
+    this.shuffleAndUpload(all);
     final List<TriggerInstance> actual = new ArrayList<>(this.triggerInstLoader
         .getIncompleteTriggerInstances());
     all.sort(Comparator.comparing(TriggerInstance::getId));
@@ -246,7 +257,7 @@ public class FlowTriggerInstanceLoaderTest {
     // been started
     finalizeTriggerInstanceWithSuccess(allInstances.get(2), -1);
 
-    allInstances.forEach(triggerInst -> this.triggerInstLoader.uploadTriggerInstance(triggerInst));
+    this.shuffleAndUpload(allInstances);
 
     final List<TriggerInstance> expected = allInstances.subList(2, allInstances.size());
     final List<TriggerInstance> actual = new ArrayList<>(this.triggerInstLoader
@@ -297,7 +308,7 @@ public class FlowTriggerInstanceLoaderTest {
           .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis()));
     }
 
-    all.forEach(triggerInst -> this.triggerInstLoader.uploadTriggerInstance(triggerInst));
+    this.shuffleAndUpload(all);
 
     final Collection<TriggerInstance> recentlyFinished = this.triggerInstLoader
         .getRecentlyFinished(10);
@@ -308,7 +319,7 @@ public class FlowTriggerInstanceLoaderTest {
   public void testGetRecentlyFinished() {
 
     final List<TriggerInstance> all = new ArrayList<>();
-    for (int i = 0; i < 10; i++) {
+    for (int i = 0; i < 15; i++) {
       all.add(this.createTriggerInstance(this.flowTrigger, this
           .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis()
           + i * 10000));
@@ -316,10 +327,12 @@ public class FlowTriggerInstanceLoaderTest {
         finalizeTriggerInstanceWithCancelled(all.get(i));
       } else if (i <= 6) {
         finalizeTriggerInstanceWithSuccess(all.get(i), 1000);
+      } else if (i <= 9) {
+        finalizeTriggerInstanceWithCancelling(all.get(i));
       }
     }
 
-    all.forEach(triggerInst -> this.triggerInstLoader.uploadTriggerInstance(triggerInst));
+    this.shuffleAndUpload(all);
 
     final List<TriggerInstance> expected = all.subList(0, 7);
     expected.sort(Comparator.comparing(TriggerInstance::getStartTime));