azkaban-developers

Flow trigger service enhancement (#1721) The PR consists

4/6/2018 6:49:03 PM

Details

diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
index 64d728e..6daca2c 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
@@ -31,7 +31,6 @@ import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManager;
 import com.google.common.io.Files;
 import java.io.File;
-import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -136,11 +135,19 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
 
   @Override
   public Collection<TriggerInstance> getIncompleteTriggerInstances() {
-    Collection<TriggerInstance> unfinished = Collections.EMPTY_LIST;
+    final Collection<TriggerInstance> unfinished = new ArrayList<>();
     try {
-      unfinished = this.dbOperator
+      final Collection<TriggerInstance> triggerInstances = this.dbOperator
           .query(SELECT_ALL_PENDING_EXECUTIONS, new TriggerInstanceHandler());
 
+      // select incomplete trigger instances
+      for (final TriggerInstance triggerInst : triggerInstances) {
+        if (!Status.isDone(triggerInst.getStatus()) || (triggerInst.getStatus() == Status.SUCCEEDED
+            && triggerInst.getFlowExecId() == Constants.UNASSIGNED_EXEC_ID)) {
+          unfinished.add(triggerInst);
+        }
+      }
+
       // backfilling flow trigger for unfinished trigger instances
       // dedup flow config id with a set to avoid downloading/parsing same flow file multiple times
 
@@ -165,7 +172,7 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
           } else {
             logger.error("Unable to find flow file for " + flowConfigID);
           }
-        } catch (final IOException ex) {
+        } catch (final Exception ex) {
           logger.error("error in getting flow file", ex);
         } finally {
           FlowLoaderUtils.cleanUpDir(tempDir);
@@ -181,7 +188,6 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
     } catch (final SQLException ex) {
       handleSQLException(ex);
     }
-
     return unfinished;
   }
 
@@ -314,7 +320,7 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
         } else {
           logger.error("Unable to find flow file for " + triggerInstanceId);
         }
-      } catch (final IOException ex) {
+      } catch (final Exception ex) {
         logger.error("error in getting flow file", ex);
       } finally {
         FlowLoaderUtils.cleanUpDir(tempDir);
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
index de98efb..cb6dd5b 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
@@ -248,13 +248,34 @@ public class FlowTriggerService {
   public void recoverIncompleteTriggerInstances() {
     final Collection<TriggerInstance> unfinishedTriggerInstances = this.flowTriggerInstanceLoader
         .getIncompleteTriggerInstances();
-    //todo chengren311: what if flow trigger is not found?
     for (final TriggerInstance triggerInstance : unfinishedTriggerInstances) {
       if (triggerInstance.getFlowTrigger() != null) {
         recover(triggerInstance);
       } else {
-        logger.error(String.format("cannot recover the trigger instance %s, flow trigger is null ",
-            triggerInstance.getId()));
+        logger.error(String.format("cannot recover the trigger instance %s, flow trigger is null,"
+            + " cancelling it ", triggerInstance.getId()));
+
+        //finalize unrecoverable trigger instances
+        // the following situation would cause trigger instances unrecoverable:
+        // 1. project A with flow A associated with flow trigger A is uploaded
+        // 2. flow trigger A starts to run
+        // 3. project A with flow B without any flow trigger is uploaded
+        // 4. web server restarts
+        // in this case, flow trigger instance of flow trigger A will be equipped with latest
+        // project, thus failing to find the flow trigger since new project doesn't contain flow
+        // trigger at all
+        if (isDoneButFlowNotExecuted(triggerInstance)) {
+          triggerInstance.setFlowExecId(Constants.FAILED_EXEC_ID);
+          this.flowTriggerInstanceLoader.updateAssociatedFlowExecId(triggerInstance);
+        } else {
+          for (final DependencyInstance depInst : triggerInstance.getDepInstances()) {
+            if (!Status.isDone(depInst.getStatus())) {
+              processStatusAndCancelCauseUpdate(depInst, Status.CANCELLED,
+                  CancellationCause.FAILURE);
+              this.triggerProcessor.processTermination(depInst.getTriggerInstance());
+            }
+          }
+        }
       }
     }
   }
@@ -516,6 +537,7 @@ public class FlowTriggerService {
     this.singleThreadExecutorService.shutdownNow(); // Cancel currently executing tasks
     this.multiThreadsExecutorService.shutdown();
     this.multiThreadsExecutorService.shutdownNow();
+    this.triggerProcessor.shutdown();
     this.triggerPluginManager.shutdown();
   }
 }
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstance.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstance.java
index 8825987..bf0e72f 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstance.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstance.java
@@ -38,8 +38,9 @@ public class TriggerInstance {
 
   //todo chengren311: convert it to builder
   public TriggerInstance(final String id, final FlowTrigger flowTrigger, final String flowId,
-      final int flowVersion, final String submitUser, final List<DependencyInstance> depInstances,
-      final int flowExecId, final Project project) {
+      final int flowVersion, final String submitUser, final List<DependencyInstance>
+      depInstances, final int flowExecId, final Project project) {
+
     this.depInstances = ImmutableList.copyOf(depInstances);
     this.id = id;
     this.flowTrigger = flowTrigger;
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
index c584dc7..f17c499 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
@@ -60,6 +60,11 @@ public class TriggerInstanceProcessor {
     this.executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
   }
 
+  public void shutdown() {
+    this.executorService.shutdown();
+    this.executorService.shutdownNow();
+  }
+
   private void executeFlowAndUpdateExecID(final TriggerInstance triggerInst) {
     try {
       final Project project = triggerInst.getProject();