azkaban-aplcache

If a flow runs for longer than 10 days (configurable), kill it.

4/4/2017 7:59:23 PM
3.18.0

Details

diff --git a/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java b/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java
index 53bd285..dc5eab6 100644
--- a/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java
+++ b/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java
@@ -55,4 +55,8 @@ public class ServerProperties {
 
   // The property is used for the web server to get the host name of the executor when running in SOLO mode.
   public static final String EXECUTOR_HOST = "executor.host";
+
+  // Max flow running time in mins, server will kill flows running longer than this setting.
+  // if not set or <= 0, then there's no restriction on running time.
+  public static final String AZKABAN_MAX_FLOW_RUNNING_MINS = "azkaban.server.flow.max.running.minutes";
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 1edbe80..3a41b82 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -93,7 +93,7 @@ public class FlowRunner extends EventHandler implements Runnable {
   private ExecutionOptions.FailureAction failureAction;
 
   // Sync object for queuing
-  private Object mainSyncObj = new Object();
+  private final Object mainSyncObj = new Object();
 
   // Properties map
   private Props azkabanProps;
@@ -172,6 +172,10 @@ public class FlowRunner extends EventHandler implements Runnable {
     this.executorService = executorService;
     this.finishedNodes = new SwapQueue<ExecutableNode>();
     this.azkabanProps = azkabanProps;
+
+    // Create logger and execution dir in flowRunner initialization instead of flow runtime to avoid NPE
+    // where the uninitialized logger is used in flow preparing state
+    createLogger(this.flow.getFlowId());
   }
 
   public FlowRunner setFlowWatcher(FlowWatcher watcher) {
@@ -267,9 +271,6 @@ public class FlowRunner extends EventHandler implements Runnable {
     }
     flow.setInputProps(commonFlowProps);
 
-    // Create execution dir
-    createLogger(flowId);
-
     if (this.watcher != null) {
       this.watcher.setLogger(logger);
     }
@@ -309,6 +310,11 @@ public class FlowRunner extends EventHandler implements Runnable {
     }
   }
 
+
+  /**
+   * setup logger and execution dir for the flowId
+   * @param flowId
+   */
   private void createLogger(String flowId) {
     // Create logger
     String loggerName = execId + "." + flowId;
@@ -905,19 +911,15 @@ public class FlowRunner extends EventHandler implements Runnable {
   }
 
   public void kill(String user) {
-    synchronized (mainSyncObj) {
-      logger.info("Flow killed by " + user);
-      flow.setStatus(Status.KILLED);
-      kill();
-      updateFlow();
-    }
-    interrupt();
+    logger.info("Flow killed by " + user);
+    kill();
   }
 
-  private void kill() {
+  public void kill() {
     synchronized (mainSyncObj) {
+      if(flowKilled) return;
       logger.info("Kill has been called on flow " + execId);
-
+      flow.setStatus(Status.KILLED);
       // If the flow is paused, then we'll also unpause
       flowPaused = false;
       flowKilled = true;
@@ -933,7 +935,9 @@ public class FlowRunner extends EventHandler implements Runnable {
       for (JobRunner runner : activeJobRunners) {
         runner.kill();
       }
+      updateFlow();
     }
+    interrupt();
   }
 
   public void retryFailures(String user) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 9a77d21..3d5c257 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -16,11 +16,14 @@
 
 package azkaban.execapp;
 
+import azkaban.constants.ServerProperties;
+import azkaban.executor.Status;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.lang.Thread.State;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -275,10 +278,15 @@ public class FlowRunnerManager implements EventListener,
     // Every 2 mins clean the recently finished list
     private static final long RECENTLY_FINISHED_INTERVAL_MS = 2 * 60 * 1000;
 
+    // Every 5 mins kill flows running longer than allowed max running time
+    private static final long LONG_RUNNING_FLOW_KILLING_INTERVAL_MS = 5 * 60 * 1000;
+
     private boolean shutdown = false;
     private long lastExecutionDirCleanTime = -1;
     private long lastOldProjectCleanTime = -1;
     private long lastRecentlyFinishedCleanTime = -1;
+    private long lastLongRunningFlowCleanTime = -1;
+    private final long flowMaxRunningTimeInMins = azkabanProps.getInt(ServerProperties.AZKABAN_MAX_FLOW_RUNNING_MINS, 60 * 24 * 10);
 
     public CleanerThread() {
       this.setName("FlowRunnerManager-Cleaner-Thread");
@@ -291,6 +299,11 @@ public class FlowRunnerManager implements EventListener,
       this.interrupt();
     }
 
+    private boolean isFlowRunningLongerThan(ExecutableFlow flow, long flowMaxRunningTimeInMins) {
+      Set<Status> nonFinishingStatusAfterFlowStarts = new HashSet<>(Arrays.asList(Status.RUNNING, Status.QUEUED, Status.PAUSED, Status.FAILED_FINISHING));
+      return nonFinishingStatusAfterFlowStarts.contains(flow.getStatus()) && flow.getStartTime() > 0 && TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis()-flow.getStartTime()) >= flowMaxRunningTimeInMins;
+    }
+
     public void run() {
       while (!shutdown) {
         synchronized (this) {
@@ -318,6 +331,17 @@ public class FlowRunnerManager implements EventListener,
               lastExecutionDirCleanTime = currentTime;
             }
 
+            if (flowMaxRunningTimeInMins > 0 && currentTime - LONG_RUNNING_FLOW_KILLING_INTERVAL_MS > lastLongRunningFlowCleanTime) {
+              logger.info(String.format("Killing long jobs running longer than %s mins", flowMaxRunningTimeInMins));
+              for (FlowRunner flowRunner : runningFlows.values()) {
+                if (isFlowRunningLongerThan(flowRunner.getExecutableFlow(), flowMaxRunningTimeInMins)) {
+                  logger.info(String.format("Killing job [id: %s, status: %s]. It has been running for %s mins", flowRunner.getExecutableFlow().getId(), flowRunner.getExecutableFlow().getStatus(), TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis()-flowRunner.getExecutableFlow().getStartTime())));
+                  flowRunner.kill();
+                }
+              }
+              lastLongRunningFlowCleanTime = currentTime;
+            }
+
             wait(RECENTLY_FINISHED_TIME_TO_LIVE);
           } catch (InterruptedException e) {
             logger.info("Interrupted. Probably to shut down.");