azkaban-aplcache

Fix race condition caused by UpdaterThread in ExecutorManager

5/10/2017 10:25:24 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 50bc5ca..3dc9393 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -1300,6 +1300,13 @@ public class ExecutorManager extends EventHandler implements
                       "Failed to get update. Doing some clean up for flow "
                           + flow.getExecutionId();
 
+                  // The failure retry logic below won't work after removing the runningFlow
+                  // cache. numErrors and nextCheckTime are not stored in DB. So whenever we
+                  // fetch active flows from DB, numErrors will be initialized to default 0
+                  // and nexCheckTime will be -1. numErrors will never reach threshold and
+                  // flows will never be finalized in below case.
+                  // todo: jamiesjc will remove updaterThread and add separate clean up code
+                  // to handle errors.
                   if (activeFlow != null) {
                     ExecutionReference ref = activeFlow.getFirst();
                     int numErrors = ref.getNumErrors();
@@ -1333,13 +1340,9 @@ public class ExecutorManager extends EventHandler implements
                       finalizeFlows.add(flow);
                     }
                   } catch (ExecutorManagerException e) {
-                    ExecutableFlow flow = e.getExecutableFlow();
-                    logger.error(e);
-
-                    if (flow != null) {
-                      logger.error("Finalizing flow " + flow.getExecutionId());
-                      finalizeFlows.add(flow);
-                    }
+                    // Currently just ignore the update error. Will remove UpdaterThread and
+                    // add separate clean up code to handle errors.
+                    logger.error("Update execution failed. Ignored. ", e);
                   }
                 }
               }