diff --git a/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java b/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java
index 53bd285..dc5eab6 100644
--- a/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java
+++ b/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java
@@ -55,4 +55,8 @@ public class ServerProperties {
// The property is used for the web server to get the host name of the executor when running in SOLO mode.
public static final String EXECUTOR_HOST = "executor.host";
+
+ // Max flow running time in mins, server will kill flows running longer than this setting.
+ // if not set or <= 0, then there's no restriction on running time.
+ public static final String AZKABAN_MAX_FLOW_RUNNING_MINS = "azkaban.server.flow.max.running.minutes";
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 1edbe80..3a41b82 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -93,7 +93,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private ExecutionOptions.FailureAction failureAction;
// Sync object for queuing
- private Object mainSyncObj = new Object();
+ private final Object mainSyncObj = new Object();
// Properties map
private Props azkabanProps;
@@ -172,6 +172,10 @@ public class FlowRunner extends EventHandler implements Runnable {
this.executorService = executorService;
this.finishedNodes = new SwapQueue<ExecutableNode>();
this.azkabanProps = azkabanProps;
+
+ // Create logger and execution dir in flowRunner initialization instead of flow runtime to avoid NPE
+ // where the uninitialized logger is used in flow preparing state
+ createLogger(this.flow.getFlowId());
}
public FlowRunner setFlowWatcher(FlowWatcher watcher) {
@@ -267,9 +271,6 @@ public class FlowRunner extends EventHandler implements Runnable {
}
flow.setInputProps(commonFlowProps);
- // Create execution dir
- createLogger(flowId);
-
if (this.watcher != null) {
this.watcher.setLogger(logger);
}
@@ -309,6 +310,11 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
+
+ /**
+ * setup logger and execution dir for the flowId
+ * @param flowId
+ */
private void createLogger(String flowId) {
// Create logger
String loggerName = execId + "." + flowId;
@@ -905,19 +911,15 @@ public class FlowRunner extends EventHandler implements Runnable {
}
public void kill(String user) {
- synchronized (mainSyncObj) {
- logger.info("Flow killed by " + user);
- flow.setStatus(Status.KILLED);
- kill();
- updateFlow();
- }
- interrupt();
+ logger.info("Flow killed by " + user);
+ kill();
}
- private void kill() {
+ public void kill() {
synchronized (mainSyncObj) {
+ if(flowKilled) return;
logger.info("Kill has been called on flow " + execId);
-
+ flow.setStatus(Status.KILLED);
// If the flow is paused, then we'll also unpause
flowPaused = false;
flowKilled = true;
@@ -933,7 +935,9 @@ public class FlowRunner extends EventHandler implements Runnable {
for (JobRunner runner : activeJobRunners) {
runner.kill();
}
+ updateFlow();
}
+ interrupt();
}
public void retryFailures(String user) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 9a77d21..3d5c257 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -16,11 +16,14 @@
package azkaban.execapp;
+import azkaban.constants.ServerProperties;
+import azkaban.executor.Status;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.lang.Thread.State;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -275,10 +278,15 @@ public class FlowRunnerManager implements EventListener,
// Every 2 mins clean the recently finished list
private static final long RECENTLY_FINISHED_INTERVAL_MS = 2 * 60 * 1000;
+ // Every 5 mins kill flows running longer than allowed max running time
+ private static final long LONG_RUNNING_FLOW_KILLING_INTERVAL_MS = 5 * 60 * 1000;
+
private boolean shutdown = false;
private long lastExecutionDirCleanTime = -1;
private long lastOldProjectCleanTime = -1;
private long lastRecentlyFinishedCleanTime = -1;
+ private long lastLongRunningFlowCleanTime = -1;
+ private final long flowMaxRunningTimeInMins = azkabanProps.getInt(ServerProperties.AZKABAN_MAX_FLOW_RUNNING_MINS, 60 * 24 * 10);
public CleanerThread() {
this.setName("FlowRunnerManager-Cleaner-Thread");
@@ -291,6 +299,11 @@ public class FlowRunnerManager implements EventListener,
this.interrupt();
}
+ private boolean isFlowRunningLongerThan(ExecutableFlow flow, long flowMaxRunningTimeInMins) {
+ Set<Status> nonFinishingStatusAfterFlowStarts = new HashSet<>(Arrays.asList(Status.RUNNING, Status.QUEUED, Status.PAUSED, Status.FAILED_FINISHING));
+ return nonFinishingStatusAfterFlowStarts.contains(flow.getStatus()) && flow.getStartTime() > 0 && TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis()-flow.getStartTime()) >= flowMaxRunningTimeInMins;
+ }
+
public void run() {
while (!shutdown) {
synchronized (this) {
@@ -318,6 +331,17 @@ public class FlowRunnerManager implements EventListener,
lastExecutionDirCleanTime = currentTime;
}
+ if (flowMaxRunningTimeInMins > 0 && currentTime - LONG_RUNNING_FLOW_KILLING_INTERVAL_MS > lastLongRunningFlowCleanTime) {
+ logger.info(String.format("Killing long jobs running longer than %s mins", flowMaxRunningTimeInMins));
+ for (FlowRunner flowRunner : runningFlows.values()) {
+ if (isFlowRunningLongerThan(flowRunner.getExecutableFlow(), flowMaxRunningTimeInMins)) {
+ logger.info(String.format("Killing job [id: %s, status: %s]. It has been running for %s mins", flowRunner.getExecutableFlow().getId(), flowRunner.getExecutableFlow().getStatus(), TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis()-flowRunner.getExecutableFlow().getStartTime())));
+ flowRunner.kill();
+ }
+ }
+ lastLongRunningFlowCleanTime = currentTime;
+ }
+
wait(RECENTLY_FINISHED_TIME_TO_LIVE);
} catch (InterruptedException e) {
logger.info("Interrupted. Probably to shut down.");