azkaban-uncached

Fixed some minor stuff

5/23/2013 6:37:53 PM

Details

diff --git a/conf/azkaban.properties b/conf/azkaban.properties
index 7884f10..72ef395 100644
--- a/conf/azkaban.properties
+++ b/conf/azkaban.properties
@@ -46,4 +46,6 @@ mail.host=
 job.failure.email=
 job.success.email=
 
-lockdown.create.projects=false
\ No newline at end of file
+lockdown.create.projects=false
+
+cache.directory=cache
\ No newline at end of file
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 527956d..5e580f0 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -16,6 +16,7 @@
 
 package azkaban.executor;
 
+import java.io.File;
 import java.io.IOException;
 import java.lang.Thread.State;
 import java.net.URI;
@@ -70,12 +71,15 @@ public class ExecutorManager {
 	
 	private long lastThreadCheckTime = -1;
 	
+	File cacheDir;
+	
 	public ExecutorManager(Props props, ExecutorLoader loader) throws ExecutorManagerException {
 		this.executorLoader = loader;
 		this.loadRunningFlows();
 		
 		executorHost = props.getString("executor.host", "localhost");
 		executorPort = props.getInt("executor.port");
+		cacheDir = new File(props.getString("cache.directory", "cache"));
 		mailer = new ExecutorMailer(props);
 		executingManager = new ExecutingManagerUpdaterThread();
 		executingManager.start();
@@ -663,7 +667,7 @@ public class ExecutorManager {
 						// Add new finished
 						for (ExecutableFlow flow: finishedFlows) {
 							if(flow.getScheduleId() >= 0 && flow.getStatus() == Status.SUCCEEDED){
-								ScheduleStatisticManager.invalidateCache(flow.getScheduleId());
+								ScheduleStatisticManager.invalidateCache(flow.getScheduleId(), cacheDir);
 							}
 							recentlyFinished.put(flow.getExecutionId(), flow);
 						}
diff --git a/src/java/azkaban/scheduler/ScheduleStatisticManager.java b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
index 28fc11c..5c8c1b6 100644
--- a/src/java/azkaban/scheduler/ScheduleStatisticManager.java
+++ b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
@@ -16,10 +16,13 @@ import azkaban.webapp.AzkabanWebServer;
 
 public class ScheduleStatisticManager {
 	private static HashMap<Integer, Object> cacheLock = new HashMap<Integer, Object>();
-	private static File cacheDirectory = new File("cache/schedule-statistics");
+	private static File cacheDirectory;
 	private static final int STAT_NUMBERS = 10;
 
 	public static Map<String, Object> getStatistics(int scheduleId, AzkabanWebServer server) {
+		if (cacheDirectory == null) {
+			setCacheFolder(new File(server.getServerProps().getString("cache.directory", "cache")));
+		}
 		Map<String, Object> data = loadCache(scheduleId);
 		if (data != null) {
 			return data;
@@ -74,7 +77,8 @@ public class ScheduleStatisticManager {
 		return data;
 	}
 
-	public static void invalidateCache(int scheduleId) {
+	public static void invalidateCache(int scheduleId, File cacheDir) {
+		setCacheFolder(cacheDir);
 		// This should be silent and not fail
 		try {
 			Object lock = getLock(scheduleId);
@@ -147,4 +151,10 @@ public class ScheduleStatisticManager {
 			cacheLock.remove(scheduleId);
 		}
 	}
+
+	private static void setCacheFolder(File cacheDir) {
+		if (cacheDirectory == null) {
+			cacheDirectory = new File(cacheDir, "schedule-statistics");
+		}
+	}
 }
diff --git a/src/java/azkaban/utils/SplitterOutputStream.java b/src/java/azkaban/utils/SplitterOutputStream.java
index 81821c6..cc1b4f5 100644
--- a/src/java/azkaban/utils/SplitterOutputStream.java
+++ b/src/java/azkaban/utils/SplitterOutputStream.java
@@ -38,15 +38,31 @@ public class SplitterOutputStream extends OutputStream {
 
 	@Override
 	public void flush() throws IOException {
+		IOException exception = null;
 		for (OutputStream output : outputs) {
-			output.flush();
+			try {
+				output.flush();
+			} catch (IOException e) {
+				exception = e;
+			}
+		}
+		if (exception != null) {
+			throw exception;
 		}
 	}
 
 	@Override
 	public void close() throws IOException {
+		IOException exception = null;
 		for (OutputStream output : outputs) {
-			output.close();
+			try {
+				output.close();
+			} catch (IOException e) {
+				exception = e;
+			}
+		}
+		if (exception != null) {
+			throw exception;
 		}
 	}
 
diff --git a/src/java/azkaban/utils/Utils.java b/src/java/azkaban/utils/Utils.java
index 03b1bab..fb4419f 100644
--- a/src/java/azkaban/utils/Utils.java
+++ b/src/java/azkaban/utils/Utils.java
@@ -277,12 +277,4 @@ public class Utils {
 		Method method = clazz.getDeclaredMethod(methodName, argTypes);
 		return method.invoke(null, args);
 	}
-	
-	public static void copyStream(InputStream input, OutputStream output) throws IOException {
-		byte[] buffer = new byte[1024];
-		int bytesRead;
-		while ((bytesRead = input.read(buffer)) != -1) {
-			output.write(buffer, 0, bytesRead);
-		}
-	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 312b7da..c2d068d 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -16,6 +16,8 @@
 
 package azkaban.webapp.servlet;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -33,6 +35,7 @@ import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
@@ -64,7 +67,6 @@ import azkaban.user.Permission.Type;
 import azkaban.user.User;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.SplitterOutputStream;
-import azkaban.utils.Utils;
 import azkaban.webapp.AzkabanWebServer;
 import azkaban.webapp.session.Session;
 
@@ -416,7 +418,9 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		int loadAll = getIntParam(req, "loadAll");
 
 		// Cache file
-		File cache = new File("cache/schedule-history/" + startTime + ".cache");
+		String cacheDir = getApplication().getServerProps().getString("cache.directory", "cache");
+		File cacheDirFile = new File(cacheDir, "schedule-history");
+		File cache = new File(cacheDirFile, startTime + ".cache");
 		cache.getParentFile().mkdirs();
 
 		if (useCache) {
@@ -427,8 +431,8 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 			}
 			if (cacheExists) {
 				// Send the cache instead
-				InputStream cacheInput = new FileInputStream(cache);
-				Utils.copyStream(cacheInput, resp.getOutputStream());
+				InputStream cacheInput = new BufferedInputStream(new FileInputStream(cache));
+				IOUtils.copy(cacheInput, resp.getOutputStream());
 				// System.out.println("Using cache copy for " + start);
 				return;
 			}
@@ -441,8 +445,9 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 			ExecutorManager executorManager = server.getExecutorManager();
 			history = executorManager.getExecutableFlows(null, null, null, 0, startTime, endTime, -1, -1);
 		} catch (ExecutorManagerException e) {
-			// Return empty should suffice
+			logger.error(e);
 		}
+		
 		HashMap<String, Object> ret = new HashMap<String, Object>();
 		List<HashMap<String, Object>> output = new ArrayList<HashMap<String, Object>>();
 		ret.put("items", output);
@@ -462,14 +467,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		}
 		
 		//Create cache file
-		File cacheTemp = new File("cache/schedule-history/" + startTime + ".tmp");
+		File cacheTemp = new File(cacheDirFile, startTime + ".tmp");
 		cacheTemp.createNewFile();
-		OutputStream cacheOutput = new FileOutputStream(cacheTemp);
-
+		OutputStream cacheOutput = new BufferedOutputStream(new FileOutputStream(cacheTemp));
+		OutputStream outputStream = new SplitterOutputStream(cacheOutput, resp.getOutputStream());
 		// Write to both the cache file and web output
-		JSONUtils.toJSON(ret, new SplitterOutputStream(cacheOutput, resp.getOutputStream()), false);
-		// System.out.println("Writing cache file for " + start);
-		// JSONUtils.toJSON(ret, new JSONCompressorOutputStream(resp.getOutputStream()), false);
+		JSONUtils.toJSON(ret, outputStream, false);
+		cacheOutput.close();
 		
 		//Move cache file
 		synchronized (this) {
diff --git a/src/web/js/azkaban.schedule.svg.js b/src/web/js/azkaban.schedule.svg.js
index e9e583c..89118d1 100644
--- a/src/web/js/azkaban.schedule.svg.js
+++ b/src/web/js/azkaban.schedule.svg.js
@@ -432,8 +432,6 @@ $(function() {
 					{
 						var items = data.items;
 
-						console.log(data);
-
 						//Sort items by day
 						for(var i = 0; i < items.length; i++)
 						{