azkaban-uncached
Changes
src/java/azkaban/execapp/FlowRunnerManager.java 94(+77 -17)
Details
src/java/azkaban/execapp/FlowRunnerManager.java 94(+77 -17)
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 0275778..68f5882 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -17,6 +17,7 @@
package azkaban.execapp;
import java.io.File;
+import java.io.FileFilter;
import java.io.IOException;
import java.lang.Thread.State;
import java.util.ArrayList;
@@ -56,7 +57,7 @@ public class FlowRunnerManager implements EventListener {
private File executionDirectory;
private File projectDirectory;
- private static final long RECENTLY_FINISHED_TIME_TO_LIVE = 120000; // recently finished secs to clean up. 1 minute
+ private static final long RECENTLY_FINISHED_TIME_TO_LIVE = 60*1000; // recently finished secs to clean up. 1 minute
private static final int DEFAULT_NUM_EXECUTING_FLOWS = 30;
private Map<Pair<Integer,Integer>, ProjectVersion> installedProjects = new ConcurrentHashMap<Pair<Integer,Integer>, ProjectVersion>();
@@ -78,12 +79,17 @@ public class FlowRunnerManager implements EventListener {
private long lastSubmitterThreadCheckTime = -1;
private long lastCleanerThreadCheckTime = -1;
+ private long executionDirRetention = 7*24*60*60*1000;
+
+ private Object executionDirDeletionSync = new Object();
public FlowRunnerManager(Props props, ExecutorLoader executorLoader, ProjectLoader projectLoader, ClassLoader parentClassLoader) throws IOException {
executionDirectory = new File(props.getString("azkaban.execution.dir", "executions"));
projectDirectory = new File(props.getString("azkaban.project.dir", "projects"));
//JobWrappingFactory.init(props, getClass().getClassLoader());
+ executionDirRetention = props.getLong("execution.dir.retention", executionDirRetention);
+ logger.info("Execution dir retention set to " + executionDirRetention + " ms");
if (!executionDirectory.exists()) {
executionDirectory.mkdirs();
@@ -146,7 +152,17 @@ public class FlowRunnerManager implements EventListener {
}
private class CleanerThread extends Thread {
+ // Every hour, clean execution dir.
+ private static final long EXECUTION_DIR_CLEAN_INTERVAL_MS = 60*60*1000;
+ // Every 5 mins clean the old project dir
+ private static final long OLD_PROJECT_DIR_INTERVAL_MS = 5*60*1000;
+ // Every 2 mins clean the recently finished list
+ private static final long RECENTLY_FINISHED_INTERVAL_MS = 2*60*1000;
+
private boolean shutdown = false;
+ private long lastExecutionDirCleanTime = -1;
+ private long lastOldProjectCleanTime = -1;
+ private long lastRecentlyFinishedCleanTime = -1;
public CleanerThread() {
this.setName("FlowRunnerManager-Cleaner-Thread");
@@ -163,10 +179,28 @@ public class FlowRunnerManager implements EventListener {
synchronized (this) {
try {
lastCleanerThreadCheckTime = System.currentTimeMillis();
- wait(RECENTLY_FINISHED_TIME_TO_LIVE);
+
// Cleanup old stuff.
- cleanRecentlyFinished();
- cleanOlderProjects();
+ long currentTime = System.currentTimeMillis();
+ if (currentTime - RECENTLY_FINISHED_INTERVAL_MS > lastRecentlyFinishedCleanTime) {
+ logger.info("Cleaning recently finished");
+ cleanRecentlyFinished();
+ lastRecentlyFinishedCleanTime = currentTime;
+ }
+
+ if (currentTime - OLD_PROJECT_DIR_INTERVAL_MS > lastOldProjectCleanTime) {
+ logger.info("Cleaning old projects");
+ cleanOlderProjects();
+ lastOldProjectCleanTime = currentTime;
+ }
+
+ if (currentTime - EXECUTION_DIR_CLEAN_INTERVAL_MS > lastExecutionDirCleanTime) {
+ logger.info("Cleaning old execution dirs");
+ cleanOlderExecutionDirs();
+ lastExecutionDirCleanTime = currentTime;
+ }
+
+ wait(RECENTLY_FINISHED_TIME_TO_LIVE);
} catch (InterruptedException e) {
logger.info("Interrupted. Probably to shut down.");
}
@@ -174,6 +208,42 @@ public class FlowRunnerManager implements EventListener {
}
}
+ private void cleanOlderExecutionDirs() {
+ File dir = executionDirectory;
+
+ final long pastTimeThreshold = System.currentTimeMillis() - executionDirRetention;
+ File[] executionDirs = dir.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File path) {
+ if (path.isDirectory() && path.lastModified() < pastTimeThreshold) {
+ return true;
+ }
+ return false;
+ }
+ });
+
+ for (File exDir : executionDirs) {
+ try {
+ int execId = Integer.valueOf(exDir.getName());
+ if (runningFlows.containsKey(execId) || recentlyFinishedFlows.containsKey(execId)) {
+ continue;
+ }
+ }
+ catch (NumberFormatException e) {
+ logger.error("Can't delete exec dir " + exDir.getName() + " it is not a number");
+ continue;
+ }
+
+ synchronized(executionDirDeletionSync) {
+ try {
+ FileUtils.deleteDirectory(exDir);
+ } catch (IOException e) {
+ logger.error("Error cleaning execution dir " + exDir.getPath(), e);
+ }
+ }
+ }
+ }
+
private void cleanRecentlyFinished() {
long cleanupThreshold = System.currentTimeMillis() - RECENTLY_FINISHED_TIME_TO_LIVE;
ArrayList<Integer> executionToKill = new ArrayList<Integer>();
@@ -233,6 +303,7 @@ public class FlowRunnerManager implements EventListener {
}
}
+
public void submitFlow(int execId) throws ExecutorManagerException {
// Load file and submit
if (runningFlows.containsKey(execId)) {
@@ -405,17 +476,6 @@ public class FlowRunnerManager implements EventListener {
ExecutableFlow flow = flowRunner.getExecutableFlow();
recentlyFinishedFlows.put(flow.getExecutionId(), flow);
- File dir = flowRunner.getExecutionDir();
- if (dir != null && dir.exists()) {
- try {
- synchronized(dir) {
- if(flow.getStatus() == Status.SUCCEEDED)
- FileUtils.deleteDirectory(dir);
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
runningFlows.remove(flow.getExecutionId());
}
}
@@ -429,7 +489,7 @@ public class FlowRunnerManager implements EventListener {
File dir = runner.getExecutionDir();
if (dir != null && dir.exists()) {
try {
- synchronized(dir) {
+ synchronized(executionDirDeletionSync) {
if (!dir.exists()) {
throw new ExecutorManagerException("Execution dir file doesn't exist. Probably has beend deleted");
}
@@ -459,7 +519,7 @@ public class FlowRunnerManager implements EventListener {
File dir = runner.getExecutionDir();
if (dir != null && dir.exists()) {
try {
- synchronized(dir) {
+ synchronized(executionDirDeletionSync) {
if (!dir.exists()) {
throw new ExecutorManagerException("Execution dir file doesn't exist. Probably has beend deleted");
}
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index bbca3cf..71eb652 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -35,6 +35,7 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.FileAppender;
import org.apache.log4j.Logger;
import org.apache.velocity.app.VelocityEngine;
import org.apache.velocity.runtime.log.Log4JLogChute;
@@ -179,7 +180,10 @@ public class AzkabanWebServer implements AzkabanServer {
configureMBeanServer();
}
-
+ private void setupLoggers() {
+ //FileAppender fileAppender = new FileAppender(loggerLayout, absolutePath, true);
+
+ }
private void setViewerPlugins(List<ViewerPlugin> viewerPlugins) {
this.viewerPlugins = viewerPlugins;