azkaban-uncached
Changes
conf/azkaban.properties 4(+3 -1)
src/java/azkaban/utils/Utils.java 8(+0 -8)
src/web/js/azkaban.schedule.svg.js 2(+0 -2)
Details
conf/azkaban.properties 4(+3 -1)
diff --git a/conf/azkaban.properties b/conf/azkaban.properties
index 7884f10..72ef395 100644
--- a/conf/azkaban.properties
+++ b/conf/azkaban.properties
@@ -46,4 +46,6 @@ mail.host=
job.failure.email=
job.success.email=
-lockdown.create.projects=false
\ No newline at end of file
+lockdown.create.projects=false
+
+cache.directory=cache
\ No newline at end of file
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 527956d..5e580f0 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -16,6 +16,7 @@
package azkaban.executor;
+import java.io.File;
import java.io.IOException;
import java.lang.Thread.State;
import java.net.URI;
@@ -70,12 +71,15 @@ public class ExecutorManager {
private long lastThreadCheckTime = -1;
+ File cacheDir;
+
public ExecutorManager(Props props, ExecutorLoader loader) throws ExecutorManagerException {
this.executorLoader = loader;
this.loadRunningFlows();
executorHost = props.getString("executor.host", "localhost");
executorPort = props.getInt("executor.port");
+ cacheDir = new File(props.getString("cache.directory", "cache"));
mailer = new ExecutorMailer(props);
executingManager = new ExecutingManagerUpdaterThread();
executingManager.start();
@@ -663,7 +667,7 @@ public class ExecutorManager {
// Add new finished
for (ExecutableFlow flow: finishedFlows) {
if(flow.getScheduleId() >= 0 && flow.getStatus() == Status.SUCCEEDED){
- ScheduleStatisticManager.invalidateCache(flow.getScheduleId());
+ ScheduleStatisticManager.invalidateCache(flow.getScheduleId(), cacheDir);
}
recentlyFinished.put(flow.getExecutionId(), flow);
}
diff --git a/src/java/azkaban/scheduler/ScheduleStatisticManager.java b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
index 28fc11c..5c8c1b6 100644
--- a/src/java/azkaban/scheduler/ScheduleStatisticManager.java
+++ b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
@@ -16,10 +16,13 @@ import azkaban.webapp.AzkabanWebServer;
public class ScheduleStatisticManager {
private static HashMap<Integer, Object> cacheLock = new HashMap<Integer, Object>();
- private static File cacheDirectory = new File("cache/schedule-statistics");
+ private static File cacheDirectory;
private static final int STAT_NUMBERS = 10;
public static Map<String, Object> getStatistics(int scheduleId, AzkabanWebServer server) {
+ if (cacheDirectory == null) {
+ setCacheFolder(new File(server.getServerProps().getString("cache.directory", "cache")));
+ }
Map<String, Object> data = loadCache(scheduleId);
if (data != null) {
return data;
@@ -74,7 +77,8 @@ public class ScheduleStatisticManager {
return data;
}
- public static void invalidateCache(int scheduleId) {
+ public static void invalidateCache(int scheduleId, File cacheDir) {
+ setCacheFolder(cacheDir);
// This should be silent and not fail
try {
Object lock = getLock(scheduleId);
@@ -147,4 +151,10 @@ public class ScheduleStatisticManager {
cacheLock.remove(scheduleId);
}
}
+
+ private static void setCacheFolder(File cacheDir) {
+ if (cacheDirectory == null) {
+ cacheDirectory = new File(cacheDir, "schedule-statistics");
+ }
+ }
}
diff --git a/src/java/azkaban/utils/SplitterOutputStream.java b/src/java/azkaban/utils/SplitterOutputStream.java
index 81821c6..cc1b4f5 100644
--- a/src/java/azkaban/utils/SplitterOutputStream.java
+++ b/src/java/azkaban/utils/SplitterOutputStream.java
@@ -38,15 +38,31 @@ public class SplitterOutputStream extends OutputStream {
@Override
public void flush() throws IOException {
+ IOException exception = null;
for (OutputStream output : outputs) {
- output.flush();
+ try {
+ output.flush();
+ } catch (IOException e) {
+ exception = e;
+ }
+ }
+ if (exception != null) {
+ throw exception;
}
}
@Override
public void close() throws IOException {
+ IOException exception = null;
for (OutputStream output : outputs) {
- output.close();
+ try {
+ output.close();
+ } catch (IOException e) {
+ exception = e;
+ }
+ }
+ if (exception != null) {
+ throw exception;
}
}
src/java/azkaban/utils/Utils.java 8(+0 -8)
diff --git a/src/java/azkaban/utils/Utils.java b/src/java/azkaban/utils/Utils.java
index 03b1bab..fb4419f 100644
--- a/src/java/azkaban/utils/Utils.java
+++ b/src/java/azkaban/utils/Utils.java
@@ -277,12 +277,4 @@ public class Utils {
Method method = clazz.getDeclaredMethod(methodName, argTypes);
return method.invoke(null, args);
}
-
- public static void copyStream(InputStream input, OutputStream output) throws IOException {
- byte[] buffer = new byte[1024];
- int bytesRead;
- while ((bytesRead = input.read(buffer)) != -1) {
- output.write(buffer, 0, bytesRead);
- }
- }
}
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 312b7da..c2d068d 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -16,6 +16,8 @@
package azkaban.webapp.servlet;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -33,6 +35,7 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@@ -64,7 +67,6 @@ import azkaban.user.Permission.Type;
import azkaban.user.User;
import azkaban.utils.JSONUtils;
import azkaban.utils.SplitterOutputStream;
-import azkaban.utils.Utils;
import azkaban.webapp.AzkabanWebServer;
import azkaban.webapp.session.Session;
@@ -416,7 +418,9 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
int loadAll = getIntParam(req, "loadAll");
// Cache file
- File cache = new File("cache/schedule-history/" + startTime + ".cache");
+ String cacheDir = getApplication().getServerProps().getString("cache.directory", "cache");
+ File cacheDirFile = new File(cacheDir, "schedule-history");
+ File cache = new File(cacheDirFile, startTime + ".cache");
cache.getParentFile().mkdirs();
if (useCache) {
@@ -427,8 +431,8 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
if (cacheExists) {
// Send the cache instead
- InputStream cacheInput = new FileInputStream(cache);
- Utils.copyStream(cacheInput, resp.getOutputStream());
+ InputStream cacheInput = new BufferedInputStream(new FileInputStream(cache));
+ IOUtils.copy(cacheInput, resp.getOutputStream());
// System.out.println("Using cache copy for " + start);
return;
}
@@ -441,8 +445,9 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
ExecutorManager executorManager = server.getExecutorManager();
history = executorManager.getExecutableFlows(null, null, null, 0, startTime, endTime, -1, -1);
} catch (ExecutorManagerException e) {
- // Return empty should suffice
+ logger.error(e);
}
+
HashMap<String, Object> ret = new HashMap<String, Object>();
List<HashMap<String, Object>> output = new ArrayList<HashMap<String, Object>>();
ret.put("items", output);
@@ -462,14 +467,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
//Create cache file
- File cacheTemp = new File("cache/schedule-history/" + startTime + ".tmp");
+ File cacheTemp = new File(cacheDirFile, startTime + ".tmp");
cacheTemp.createNewFile();
- OutputStream cacheOutput = new FileOutputStream(cacheTemp);
-
+ OutputStream cacheOutput = new BufferedOutputStream(new FileOutputStream(cacheTemp));
+ OutputStream outputStream = new SplitterOutputStream(cacheOutput, resp.getOutputStream());
// Write to both the cache file and web output
- JSONUtils.toJSON(ret, new SplitterOutputStream(cacheOutput, resp.getOutputStream()), false);
- // System.out.println("Writing cache file for " + start);
- // JSONUtils.toJSON(ret, new JSONCompressorOutputStream(resp.getOutputStream()), false);
+ JSONUtils.toJSON(ret, outputStream, false);
+ cacheOutput.close();
//Move cache file
synchronized (this) {
src/web/js/azkaban.schedule.svg.js 2(+0 -2)
diff --git a/src/web/js/azkaban.schedule.svg.js b/src/web/js/azkaban.schedule.svg.js
index e9e583c..89118d1 100644
--- a/src/web/js/azkaban.schedule.svg.js
+++ b/src/web/js/azkaban.schedule.svg.js
@@ -432,8 +432,6 @@ $(function() {
{
var items = data.items;
- console.log(data);
-
//Sort items by day
for(var i = 0; i < items.length; i++)
{