azkaban-uncached

Successful executions now keep the execution directories

2/20/2013 10:35:11 PM

Details

diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 0275778..68f5882 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -17,6 +17,7 @@
 package azkaban.execapp;
 
 import java.io.File;
+import java.io.FileFilter;
 import java.io.IOException;
 import java.lang.Thread.State;
 import java.util.ArrayList;
@@ -56,7 +57,7 @@ public class FlowRunnerManager implements EventListener {
 	private File executionDirectory;
 	private File projectDirectory;
 
-	private static final long RECENTLY_FINISHED_TIME_TO_LIVE = 120000; // recently finished secs to clean up. 1 minute
+	private static final long RECENTLY_FINISHED_TIME_TO_LIVE = 60*1000; // recently finished secs to clean up. 1 minute
 	
 	private static final int DEFAULT_NUM_EXECUTING_FLOWS = 30;
 	private Map<Pair<Integer,Integer>, ProjectVersion> installedProjects = new ConcurrentHashMap<Pair<Integer,Integer>, ProjectVersion>();
@@ -78,12 +79,17 @@ public class FlowRunnerManager implements EventListener {
 	
 	private long lastSubmitterThreadCheckTime = -1;
 	private long lastCleanerThreadCheckTime = -1;
+	private long executionDirRetention = 7*24*60*60*1000;
+	
+	private Object executionDirDeletionSync = new Object();
 	
 	public FlowRunnerManager(Props props, ExecutorLoader executorLoader, ProjectLoader projectLoader, ClassLoader parentClassLoader) throws IOException {
 		executionDirectory = new File(props.getString("azkaban.execution.dir", "executions"));
 		projectDirectory = new File(props.getString("azkaban.project.dir", "projects"));
 		
 		//JobWrappingFactory.init(props, getClass().getClassLoader());
+		executionDirRetention = props.getLong("execution.dir.retention", executionDirRetention);
+		logger.info("Execution dir retention set to " + executionDirRetention + " ms");
 		
 		if (!executionDirectory.exists()) {
 			executionDirectory.mkdirs();
@@ -146,7 +152,17 @@ public class FlowRunnerManager implements EventListener {
 	}
 	
 	private class CleanerThread extends Thread {
+		// Every hour, clean execution dir.
+		private static final long EXECUTION_DIR_CLEAN_INTERVAL_MS = 60*60*1000;
+		// Every 5 mins clean the old project dir
+		private static final long OLD_PROJECT_DIR_INTERVAL_MS = 5*60*1000;
+		// Every 2 mins clean the recently finished list
+		private static final long RECENTLY_FINISHED_INTERVAL_MS = 2*60*1000;
+		
 		private boolean shutdown = false;
+		private long lastExecutionDirCleanTime = -1;
+		private long lastOldProjectCleanTime = -1;
+		private long lastRecentlyFinishedCleanTime = -1;
 		
 		public CleanerThread() {
 			this.setName("FlowRunnerManager-Cleaner-Thread");
@@ -163,10 +179,28 @@ public class FlowRunnerManager implements EventListener {
 				synchronized (this) {
 					try {
 						lastCleanerThreadCheckTime = System.currentTimeMillis();
-						wait(RECENTLY_FINISHED_TIME_TO_LIVE);
+						
 						// Cleanup old stuff.
-						cleanRecentlyFinished();
-						cleanOlderProjects();
+						long currentTime = System.currentTimeMillis();
+						if (currentTime - RECENTLY_FINISHED_INTERVAL_MS > lastRecentlyFinishedCleanTime) {
+							logger.info("Cleaning recently finished");
+							cleanRecentlyFinished();
+							lastRecentlyFinishedCleanTime = currentTime;
+						}
+						
+						if (currentTime - OLD_PROJECT_DIR_INTERVAL_MS > lastOldProjectCleanTime) {
+							logger.info("Cleaning old projects");
+							cleanOlderProjects();
+							lastOldProjectCleanTime = currentTime;
+						}
+						
+						if (currentTime - EXECUTION_DIR_CLEAN_INTERVAL_MS > lastExecutionDirCleanTime) {
+							logger.info("Cleaning old execution dirs");
+							cleanOlderExecutionDirs();
+							lastExecutionDirCleanTime = currentTime;
+						}
+						
+						wait(RECENTLY_FINISHED_TIME_TO_LIVE);
 					} catch (InterruptedException e) {
 						logger.info("Interrupted. Probably to shut down.");
 					}
@@ -174,6 +208,42 @@ public class FlowRunnerManager implements EventListener {
 			}
 		}
 	
+		private void cleanOlderExecutionDirs() {
+			File dir = executionDirectory;
+			
+			final long pastTimeThreshold = System.currentTimeMillis() - executionDirRetention;
+			File[] executionDirs = dir.listFiles(new FileFilter() {
+				@Override
+				public boolean accept(File path) {
+					if (path.isDirectory() && path.lastModified() < pastTimeThreshold) {
+						return true;
+					}
+					return false;
+				}
+			});
+			
+			for (File exDir : executionDirs) {
+				try {
+					int execId = Integer.valueOf(exDir.getName());
+					if (runningFlows.containsKey(execId) || recentlyFinishedFlows.containsKey(execId)) {
+						continue;
+					}
+				}
+				catch (NumberFormatException e) {
+					logger.error("Can't delete exec dir " + exDir.getName() + " it is not a number");
+					continue;
+				}
+			
+				synchronized(executionDirDeletionSync) {
+					try {
+						FileUtils.deleteDirectory(exDir);
+					} catch (IOException e) {
+						logger.error("Error cleaning execution dir " + exDir.getPath(), e);
+					}
+				}
+			}
+		}
+		
 		private void cleanRecentlyFinished() {
 			long cleanupThreshold = System.currentTimeMillis() - RECENTLY_FINISHED_TIME_TO_LIVE;
 			ArrayList<Integer> executionToKill = new ArrayList<Integer>();
@@ -233,6 +303,7 @@ public class FlowRunnerManager implements EventListener {
 		}
 	}
 	
+	
 	public void submitFlow(int execId) throws ExecutorManagerException {
 		// Load file and submit
 		if (runningFlows.containsKey(execId)) {
@@ -405,17 +476,6 @@ public class FlowRunnerManager implements EventListener {
 			ExecutableFlow flow = flowRunner.getExecutableFlow();
 			recentlyFinishedFlows.put(flow.getExecutionId(), flow);
 
-			File dir = flowRunner.getExecutionDir();
-			if (dir != null && dir.exists()) {
-				try {
-					synchronized(dir) {
-						if(flow.getStatus() == Status.SUCCEEDED)
-							FileUtils.deleteDirectory(dir);
-					}
-				} catch (IOException e) {
-					e.printStackTrace();
-				}
-			}
 			runningFlows.remove(flow.getExecutionId());
 		}
 	}
@@ -429,7 +489,7 @@ public class FlowRunnerManager implements EventListener {
 		File dir = runner.getExecutionDir();
 		if (dir != null && dir.exists()) {
 			try {
-				synchronized(dir) {
+				synchronized(executionDirDeletionSync) {
 					if (!dir.exists()) {
 						throw new ExecutorManagerException("Execution dir file doesn't exist. Probably has beend deleted");
 					}
@@ -459,7 +519,7 @@ public class FlowRunnerManager implements EventListener {
 		File dir = runner.getExecutionDir();
 		if (dir != null && dir.exists()) {
 			try {
-				synchronized(dir) {
+				synchronized(executionDirDeletionSync) {
 					if (!dir.exists()) {
 						throw new ExecutorManagerException("Execution dir file doesn't exist. Probably has beend deleted");
 					}
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index bbca3cf..71eb652 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -35,6 +35,7 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.FileAppender;
 import org.apache.log4j.Logger;
 import org.apache.velocity.app.VelocityEngine;
 import org.apache.velocity.runtime.log.Log4JLogChute;
@@ -179,7 +180,10 @@ public class AzkabanWebServer implements AzkabanServer {
 		configureMBeanServer();
 	}
 	
-	
+	private void setupLoggers() {
+		//FileAppender fileAppender = new FileAppender(loggerLayout, absolutePath, true);
+		
+	}
 
 	private void setViewerPlugins(List<ViewerPlugin> viewerPlugins) {
 		this.viewerPlugins = viewerPlugins;