azkaban-aplcache
Changes
azkaban-web-server/src/main/java/azkaban/flowtrigger/database/FlowTriggerInstanceLoader.java 5(+5 -0)
Details
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/FlowTriggerInstanceLoader.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/FlowTriggerInstanceLoader.java
index b8c670d..5f7939e 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/FlowTriggerInstanceLoader.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/FlowTriggerInstanceLoader.java
@@ -52,6 +52,11 @@ public interface FlowTriggerInstanceLoader {
*/
Collection<TriggerInstance> getRecentlyFinished(int limit);
+ /**
+ * Retrieve running trigger instances.
+ */
+ Collection<TriggerInstance> getRunning();
+
TriggerInstance getTriggerInstanceById(String triggerInstanceId);
}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
index 5867593..150274c 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
@@ -94,6 +94,12 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
Status.SUCCEEDED.ordinal(),
Constants.UNASSIGNED_EXEC_ID);
+ private static final String SELECT_ALL_RUNNING_EXECUTIONS =
+ String.format("SELECT %s FROM %s WHERE dep_status = %s or dep_status = %s",
+ StringUtils.join(DEPENDENCY_EXECUTIONS_COLUMNS, ","),
+ DEPENDENCY_EXECUTION_TABLE,
+ Status.RUNNING.ordinal(), Status.CANCELLING.ordinal());
+
private static final String SELECT_RECENTLY_FINISHED = String.format(
"SELECT execution_dependencies.trigger_instance_id,dep_name,starttime,endtime,dep_status,"
+ "cancelleation_cause,project_id,"
@@ -258,6 +264,19 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
return Collections.emptyList();
}
+ @Override
+ public Collection<TriggerInstance> getRunning() {
+ try {
+ //todo chengren311:
+ // 1. add index for the execution_dependencies table to accelerate selection.
+ // 2. implement purging mechanism to keep reasonable amount of historical executions in db.
+ return this.dbOperator.query(SELECT_ALL_RUNNING_EXECUTIONS, new TriggerInstanceHandler());
+ } catch (final SQLException ex) {
+ handleSQLException(ex);
+ }
+ return Collections.emptyList();
+ }
+
/**
* Retrieve a trigger instance given an instance id. Flow trigger properties will also be
* populated into the returned trigger instance.
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
index 3447d13..3eefcfc 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
@@ -31,7 +31,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -163,16 +162,7 @@ public class FlowTriggerService {
* @return the list of running trigger instances
*/
public Collection<TriggerInstance> getRunningTriggers() {
- final Future future = this.executorService.submit(
- (Callable) () -> FlowTriggerService.this.runningTriggers);
-
- List<TriggerInstance> triggerInstanceList = new ArrayList<>();
- try {
- triggerInstanceList = (List<TriggerInstance>) future.get();
- } catch (final Exception ex) {
- logger.error("error in getting running triggers", ex);
- }
- return triggerInstanceList;
+ return this.flowTriggerInstanceLoader.getRunning();
}
/**
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
index 37f796e..7e8e205 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
@@ -264,6 +264,51 @@ public class FlowTriggerInstanceLoaderTest {
assertTwoTriggerInstanceListsEqual(actual, expected, false, false);
}
+ @Test
+ public void testGetRunningTriggerInstancesReturnsEmpty() throws InterruptedException {
+ final List<TriggerInstance> all = new ArrayList<>();
+ for (int i = 0; i < 15; i++) {
+ all.add(this.createTriggerInstance(this.flowTrigger, this
+ .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis()
+ + i * 10000));
+ finalizeTriggerInstanceWithSuccess(all.get(i), 1000);
+ }
+
+ this.shuffleAndUpload(all);
+
+ final Collection<TriggerInstance> running = this.triggerInstLoader.getRunning();
+ assertThat(running).isEmpty();
+ }
+
+ @Test
+ public void testGetRunningTriggerInstances() throws InterruptedException {
+ final List<TriggerInstance> all = new ArrayList<>();
+ for (int i = 0; i < 15; i++) {
+ all.add(this.createTriggerInstance(this.flowTrigger, this
+ .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis()
+ + i * 10000));
+ if (i <= 3) {
+ finalizeTriggerInstanceWithCancelled(all.get(i));
+ } else if (i <= 6) {
+ finalizeTriggerInstanceWithSuccess(all.get(i), 1000);
+ } else if (i <= 9) {
+ finalizeTriggerInstanceWithCancelling(all.get(i));
+ }
+ //sleep for a while to ensure endtime is different for each trigger instance
+ Thread.sleep(1000);
+ }
+
+ this.shuffleAndUpload(all);
+
+ final List<TriggerInstance> finished = all.subList(7, all.size());
+
+ final List<TriggerInstance> expected = new ArrayList<>(finished);
+ expected.sort(Comparator.comparing(TriggerInstance::getStartTime));
+
+ final Collection<TriggerInstance> running = this.triggerInstLoader.getRunning();
+ assertTwoTriggerInstanceListsEqual(new ArrayList<>(running), expected, true, true);
+ }
+
private void assertTwoTriggerInstanceListsEqual(final List<TriggerInstance> actual,
final List<TriggerInstance> expected, final boolean ignoreFlowTrigger,
final boolean keepOriginalOrder) {
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java
index d8c2999..66d4ce7 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java
@@ -86,6 +86,17 @@ public class MockFlowTriggerInstanceLoader implements FlowTriggerInstanceLoader
}
@Override
+ public Collection<TriggerInstance> getRunning() {
+ final List<TriggerInstance> res = new ArrayList<>();
+ for (final TriggerInstance inst : this.triggerInstances) {
+ if (!Status.isDone(inst.getStatus())) {
+ res.add(inst);
+ }
+ }
+ return res;
+ }
+
+ @Override
public TriggerInstance getTriggerInstanceById(final String triggerInstanceId) {
for (final TriggerInstance inst : this.triggerInstances) {
if (inst.getId().equals(triggerInstanceId)) {