azkaban-aplcache

Catch exceptions in individual triggers and avoid complete

7/13/2015 6:33:33 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
index 1998496..cdd1160 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
@@ -262,43 +262,40 @@ public class TriggerManager extends EventHandler implements
 
       // sweep through the rest of them
       for (Trigger t : triggers) {
-        scannerStage = "Checking for trigger " + t.getTriggerId();
-
-        boolean shouldSkip = true;
-        if (shouldSkip && t.getInfo() != null
-            && t.getInfo().containsKey("monitored.finished.execution")) {
-          int execId =
-              Integer.valueOf((String) t.getInfo().get(
-                  "monitored.finished.execution"));
-          if (justFinishedFlows.containsKey(execId)) {
-            logger
-                .info("Monitored execution has finished. Checking trigger earlier "
-                    + t.getTriggerId());
+        try {
+          scannerStage = "Checking for trigger " + t.getTriggerId();
+
+          boolean shouldSkip = true;
+          if (shouldSkip && t.getInfo() != null && t.getInfo().containsKey("monitored.finished.execution")) {
+            int execId = Integer.valueOf((String) t.getInfo().get("monitored.finished.execution"));
+            if (justFinishedFlows.containsKey(execId)) {
+              logger.info("Monitored execution has finished. Checking trigger earlier " + t.getTriggerId());
+              shouldSkip = false;
+            }
+          }
+          if (shouldSkip && t.getNextCheckTime() > now) {
             shouldSkip = false;
           }
-        }
-        if (shouldSkip && t.getNextCheckTime() > now) {
-          shouldSkip = false;
-        }
 
-        if (shouldSkip) {
-          logger.info("Skipping trigger" + t.getTriggerId() + " until "
-              + t.getNextCheckTime());
-        }
+          if (shouldSkip) {
+            logger.info("Skipping trigger" + t.getTriggerId() + " until " + t.getNextCheckTime());
+          }
 
-        logger.info("Checking trigger " + t.getTriggerId());
-        if (t.getStatus().equals(TriggerStatus.READY)) {
-          if (t.triggerConditionMet()) {
-            onTriggerTrigger(t);
-          } else if (t.expireConditionMet()) {
-            onTriggerExpire(t);
+          logger.info("Checking trigger " + t.getTriggerId());
+          if (t.getStatus().equals(TriggerStatus.READY)) {
+            if (t.triggerConditionMet()) {
+              onTriggerTrigger(t);
+            } else if (t.expireConditionMet()) {
+              onTriggerExpire(t);
+            }
           }
-        }
-        if (t.getStatus().equals(TriggerStatus.EXPIRED)
-            && t.getSource().equals("azkaban")) {
-          removeTrigger(t);
-        } else {
-          t.updateNextCheckTime();
+          if (t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
+            removeTrigger(t);
+          } else {
+            t.updateNextCheckTime();
+          }
+        } catch (Throwable th) {
+          logger.error("Failed to process trigger with id : " + t.getTriggerId(), th);
         }
       }
     }