azkaban-aplcache

recently finished trigger instance fix (#1658) Recently

2/22/2018 10:40:29 PM

Details

diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
index f744c47..dc3986e 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
@@ -100,10 +100,10 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
           + "cancelleation_cause,project_id,"
           + "project_version,flow_id,flow_version,project_json, flow_exec_id \n"
           + "FROM execution_dependencies JOIN (\n"
-          + "SELECT distinct(trigger_instance_id)  FROM execution_dependencies WHERE dep_status ="
-          + " %s or dep_status = %s\n"
-          + "GROUP BY trigger_instance_id\n"
-          + " limit %%s) temp on execution_dependencies"
+          + "SELECT distinct(trigger_instance_id), max(endtime) FROM execution_dependencies "
+          + "WHERE dep_status = %s or dep_status = %s\n"
+          + "GROUP BY trigger_instance_id ORDER BY max(endtime) DESC \n"
+          + " limit %%s ) temp on execution_dependencies"
           + ".trigger_instance_id in (temp.trigger_instance_id);",
       Status.SUCCEEDED.ordinal(),
       Status.CANCELLED.ordinal());
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
index f72f63b..fe98da0 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
@@ -199,6 +199,7 @@ public class FlowTriggerInstanceLoaderTest {
     for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
       depInst.setStatus(Status.SUCCEEDED);
       depInst.getTriggerInstance().setFlowExecId(associateFlowExecId);
+      depInst.setEndTime(new Date());
     }
   }
 
@@ -316,7 +317,7 @@ public class FlowTriggerInstanceLoaderTest {
   }
 
   @Test
-  public void testGetRecentlyFinished() {
+  public void testGetRecentlyFinished() throws InterruptedException {
 
     final List<TriggerInstance> all = new ArrayList<>();
     for (int i = 0; i < 15; i++) {
@@ -330,16 +331,30 @@ public class FlowTriggerInstanceLoaderTest {
       } else if (i <= 9) {
         finalizeTriggerInstanceWithCancelling(all.get(i));
       }
+      //sleep for a while to ensure endtime is different for each trigger instance
+      Thread.sleep(1000);
     }
 
     this.shuffleAndUpload(all);
 
-    final List<TriggerInstance> expected = all.subList(0, 7);
+    final List<TriggerInstance> finished = all.subList(0, 7);
+    finished.sort((o1, o2) -> -1 * (o1.getEndTime().compareTo(o2.getEndTime())));
+
+    List<TriggerInstance> expected = new ArrayList<>(finished);
     expected.sort(Comparator.comparing(TriggerInstance::getStartTime));
 
-    final Collection<TriggerInstance> recentlyFinished = this.triggerInstLoader
+    Collection<TriggerInstance> recentlyFinished = this.triggerInstLoader
         .getRecentlyFinished(10);
     assertTwoTriggerInstanceListsEqual(new ArrayList<>(recentlyFinished), expected, true, true);
+
+    expected = new ArrayList<>(finished.subList(0, 3));
+    expected.sort(Comparator.comparing(TriggerInstance::getStartTime));
+    recentlyFinished = this.triggerInstLoader.getRecentlyFinished(3);
+    assertTwoTriggerInstanceListsEqual(new ArrayList<>(recentlyFinished), expected, true, true);
+
+    expected = new ArrayList<>(finished.subList(0, 1));
+    recentlyFinished = this.triggerInstLoader.getRecentlyFinished(1);
+    assertTwoTriggerInstanceListsEqual(new ArrayList<>(recentlyFinished), expected, true, true);
   }
 
   @After