diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
index 91cc7a8..f744c47 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
@@ -95,16 +95,19 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
Status.SUCCEEDED.ordinal(),
Constants.UNASSIGNED_EXEC_ID);
- private static final String SELECT_RECENTLY_FINISHED =
+ private static final String SELECT_RECENTLY_FINISHED = String.format(
"SELECT execution_dependencies.trigger_instance_id,dep_name,starttime,endtime,dep_status,"
- + "cancelleation_cause,project_id,project_version,flow_id,flow_version,"
- + "project_json, flow_exec_id \n"
+ + "cancelleation_cause,project_id,"
+ + "project_version,flow_id,flow_version,project_json, flow_exec_id \n"
+ "FROM execution_dependencies JOIN (\n"
- + "SELECT trigger_instance_id FROM execution_dependencies WHERE trigger_instance_id not in (\n"
- + "SELECT distinct(trigger_instance_id) FROM execution_dependencies WHERE dep_status = 0 or dep_status = 4)\n"
+ + "SELECT distinct(trigger_instance_id) FROM execution_dependencies WHERE dep_status ="
+ + " %s or dep_status = %s\n"
+ "GROUP BY trigger_instance_id\n"
- + "ORDER BY min(starttime) desc limit %s) temp on execution_dependencies"
- + ".trigger_instance_id in (temp.trigger_instance_id);";
+ + " limit %%s) temp on execution_dependencies"
+ + ".trigger_instance_id in (temp.trigger_instance_id);",
+ Status.SUCCEEDED.ordinal(),
+ Status.CANCELLED.ordinal());
+
private static final String UPDATE_DEPENDENCY_FLOW_EXEC_ID = String.format("UPDATE %s SET "
+ "flow_exec_id "
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
index 377be3f..f72f63b 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
@@ -36,6 +36,7 @@ import java.io.File;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.Iterator;
@@ -209,6 +210,18 @@ public class FlowTriggerInstanceLoaderTest {
}
}
+ private void finalizeTriggerInstanceWithCancelling(final TriggerInstance triggerInst) {
+ for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
+ depInst.setStatus(Status.CANCELLING);
+ }
+ }
+
+ private void shuffleAndUpload(final List<TriggerInstance> all) {
+ final List<TriggerInstance> shuffled = new ArrayList<>(all);
+ Collections.shuffle(shuffled);
+ shuffled.forEach(triggerInst -> this.triggerInstLoader.uploadTriggerInstance(triggerInst));
+ }
+
@Test
public void testGetIncompleteTriggerInstancesReturnsEmpty() {
final List<TriggerInstance> all = new ArrayList<>();
@@ -221,9 +234,7 @@ public class FlowTriggerInstanceLoaderTest {
finalizeTriggerInstanceWithSuccess(all.get(i), 1000);
}
}
-
- all.forEach(triggerInst -> this.triggerInstLoader.uploadTriggerInstance(triggerInst));
-
+ this.shuffleAndUpload(all);
final List<TriggerInstance> actual = new ArrayList<>(this.triggerInstLoader
.getIncompleteTriggerInstances());
all.sort(Comparator.comparing(TriggerInstance::getId));
@@ -246,7 +257,7 @@ public class FlowTriggerInstanceLoaderTest {
// been started
finalizeTriggerInstanceWithSuccess(allInstances.get(2), -1);
- allInstances.forEach(triggerInst -> this.triggerInstLoader.uploadTriggerInstance(triggerInst));
+ this.shuffleAndUpload(allInstances);
final List<TriggerInstance> expected = allInstances.subList(2, allInstances.size());
final List<TriggerInstance> actual = new ArrayList<>(this.triggerInstLoader
@@ -297,7 +308,7 @@ public class FlowTriggerInstanceLoaderTest {
.flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis()));
}
- all.forEach(triggerInst -> this.triggerInstLoader.uploadTriggerInstance(triggerInst));
+ this.shuffleAndUpload(all);
final Collection<TriggerInstance> recentlyFinished = this.triggerInstLoader
.getRecentlyFinished(10);
@@ -308,7 +319,7 @@ public class FlowTriggerInstanceLoaderTest {
public void testGetRecentlyFinished() {
final List<TriggerInstance> all = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 15; i++) {
all.add(this.createTriggerInstance(this.flowTrigger, this
.flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis()
+ i * 10000));
@@ -316,10 +327,12 @@ public class FlowTriggerInstanceLoaderTest {
finalizeTriggerInstanceWithCancelled(all.get(i));
} else if (i <= 6) {
finalizeTriggerInstanceWithSuccess(all.get(i), 1000);
+ } else if (i <= 9) {
+ finalizeTriggerInstanceWithCancelling(all.get(i));
}
}
- all.forEach(triggerInst -> this.triggerInstLoader.uploadTriggerInstance(triggerInst));
+ this.shuffleAndUpload(all);
final List<TriggerInstance> expected = all.subList(0, 7);
expected.sort(Comparator.comparing(TriggerInstance::getStartTime));