azkaban-aplcache

retrieve running trigger from db instead of cache in flow trigger

3/1/2018 6:48:59 PM

Details

diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/FlowTriggerInstanceLoader.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/FlowTriggerInstanceLoader.java
index b8c670d..5f7939e 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/FlowTriggerInstanceLoader.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/FlowTriggerInstanceLoader.java
@@ -52,6 +52,11 @@ public interface FlowTriggerInstanceLoader {
    */
   Collection<TriggerInstance> getRecentlyFinished(int limit);
 
+  /**
+   * Retrieve running trigger instances.
+   */
+  Collection<TriggerInstance> getRunning();
+
   TriggerInstance getTriggerInstanceById(String triggerInstanceId);
 
 }
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
index 5867593..150274c 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
@@ -94,6 +94,12 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
               Status.SUCCEEDED.ordinal(),
               Constants.UNASSIGNED_EXEC_ID);
 
+  private static final String SELECT_ALL_RUNNING_EXECUTIONS =
+      String.format("SELECT %s FROM %s WHERE dep_status = %s or dep_status = %s",
+          StringUtils.join(DEPENDENCY_EXECUTIONS_COLUMNS, ","),
+          DEPENDENCY_EXECUTION_TABLE,
+          Status.RUNNING.ordinal(), Status.CANCELLING.ordinal());
+
   private static final String SELECT_RECENTLY_FINISHED = String.format(
       "SELECT execution_dependencies.trigger_instance_id,dep_name,starttime,endtime,dep_status,"
           + "cancelleation_cause,project_id,"
@@ -258,6 +264,19 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
     return Collections.emptyList();
   }
 
+  @Override
+  public Collection<TriggerInstance> getRunning() {
+    try {
+      //todo chengren311:
+      // 1. add index for the execution_dependencies table to accelerate selection.
+      // 2. implement purging mechanism to keep reasonable amount of historical executions in db.
+      return this.dbOperator.query(SELECT_ALL_RUNNING_EXECUTIONS, new TriggerInstanceHandler());
+    } catch (final SQLException ex) {
+      handleSQLException(ex);
+    }
+    return Collections.emptyList();
+  }
+
   /**
    * Retrieve a trigger instance given an instance id. Flow trigger properties will also be
    * populated into the returned trigger instance.
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
index 3447d13..3eefcfc 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
@@ -31,7 +31,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -163,16 +162,7 @@ public class FlowTriggerService {
    * @return the list of running trigger instances
    */
   public Collection<TriggerInstance> getRunningTriggers() {
-    final Future future = this.executorService.submit(
-        (Callable) () -> FlowTriggerService.this.runningTriggers);
-
-    List<TriggerInstance> triggerInstanceList = new ArrayList<>();
-    try {
-      triggerInstanceList = (List<TriggerInstance>) future.get();
-    } catch (final Exception ex) {
-      logger.error("error in getting running triggers", ex);
-    }
-    return triggerInstanceList;
+    return this.flowTriggerInstanceLoader.getRunning();
   }
 
   /**
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
index 37f796e..7e8e205 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
@@ -264,6 +264,51 @@ public class FlowTriggerInstanceLoaderTest {
     assertTwoTriggerInstanceListsEqual(actual, expected, false, false);
   }
 
+  @Test
+  public void testGetRunningTriggerInstancesReturnsEmpty() throws InterruptedException {
+    final List<TriggerInstance> all = new ArrayList<>();
+    for (int i = 0; i < 15; i++) {
+      all.add(this.createTriggerInstance(this.flowTrigger, this
+          .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis()
+          + i * 10000));
+      finalizeTriggerInstanceWithSuccess(all.get(i), 1000);
+    }
+
+    this.shuffleAndUpload(all);
+
+    final Collection<TriggerInstance> running = this.triggerInstLoader.getRunning();
+    assertThat(running).isEmpty();
+  }
+
+  @Test
+  public void testGetRunningTriggerInstances() throws InterruptedException {
+    final List<TriggerInstance> all = new ArrayList<>();
+    for (int i = 0; i < 15; i++) {
+      all.add(this.createTriggerInstance(this.flowTrigger, this
+          .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis()
+          + i * 10000));
+      if (i <= 3) {
+        finalizeTriggerInstanceWithCancelled(all.get(i));
+      } else if (i <= 6) {
+        finalizeTriggerInstanceWithSuccess(all.get(i), 1000);
+      } else if (i <= 9) {
+        finalizeTriggerInstanceWithCancelling(all.get(i));
+      }
+      //sleep for a while to ensure endtime is different for each trigger instance
+      Thread.sleep(1000);
+    }
+
+    this.shuffleAndUpload(all);
+
+    final List<TriggerInstance> finished = all.subList(7, all.size());
+
+    final List<TriggerInstance> expected = new ArrayList<>(finished);
+    expected.sort(Comparator.comparing(TriggerInstance::getStartTime));
+
+    final Collection<TriggerInstance> running = this.triggerInstLoader.getRunning();
+    assertTwoTriggerInstanceListsEqual(new ArrayList<>(running), expected, true, true);
+  }
+
   private void assertTwoTriggerInstanceListsEqual(final List<TriggerInstance> actual,
       final List<TriggerInstance> expected, final boolean ignoreFlowTrigger,
       final boolean keepOriginalOrder) {
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java
index d8c2999..66d4ce7 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java
@@ -86,6 +86,17 @@ public class MockFlowTriggerInstanceLoader implements FlowTriggerInstanceLoader 
   }
 
   @Override
+  public Collection<TriggerInstance> getRunning() {
+    final List<TriggerInstance> res = new ArrayList<>();
+    for (final TriggerInstance inst : this.triggerInstances) {
+      if (!Status.isDone(inst.getStatus())) {
+        res.add(inst);
+      }
+    }
+    return res;
+  }
+
+  @Override
   public TriggerInstance getTriggerInstanceById(final String triggerInstanceId) {
     for (final TriggerInstance inst : this.triggerInstances) {
       if (inst.getId().equals(triggerInstanceId)) {