azkaban-memoizeit
Changes
src/web/js/azkaban.schedule.svg.js 6(+4 -2)
Details
diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
index c0108e2..97581e9 100644
--- a/src/java/azkaban/executor/ExecutorLoader.java
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -35,6 +35,8 @@ public interface ExecutorLoader {
public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num) throws ExecutorManagerException;
+ public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num, Status status) throws ExecutorManagerException;
+
public List<ExecutableFlow> fetchFlowHistory(String projContain, String flowContains, String userNameContains, int status, long startData, long endData, int skip, int num) throws ExecutorManagerException;
public void addActiveExecutableReference(ExecutionReference ref) throws ExecutorManagerException;
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 07b81a8..527956d 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -40,6 +40,7 @@ import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import azkaban.project.Project;
+import azkaban.scheduler.ScheduleStatisticManager;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
@@ -661,6 +662,9 @@ public class ExecutorManager {
evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
// Add new finished
for (ExecutableFlow flow: finishedFlows) {
+ if(flow.getScheduleId() >= 0 && flow.getStatus() == Status.SUCCEEDED){
+ ScheduleStatisticManager.invalidateCache(flow.getScheduleId());
+ }
recentlyFinished.put(flow.getExecutionId(), flow);
}
@@ -942,7 +946,11 @@ public class ExecutorManager {
outputList.addAll(flows);
return executorLoader.fetchNumExecutableFlows(projectId, flowId);
}
-
+
+ public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId, int from, int length, Status status) throws ExecutorManagerException {
+ return executorLoader.fetchFlowHistory(projectId, flowId, from, length, status);
+ }
+
/*
* cleaner thread to clean up execution_logs, etc in DB. Runs every day.
*
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index a50b4e1..8bfbf3c 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -203,6 +203,19 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
}
@Override
+ public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num, Status status) throws ExecutorManagerException {
+ QueryRunner runner = createQueryRunner();
+ FetchExecutableFlows flowHandler = new FetchExecutableFlows();
+
+ try {
+ List<ExecutableFlow> properties = runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_STATUS, flowHandler, projectId, flowId, status.getNumVal(), skip, num);
+ return properties;
+ } catch (SQLException e) {
+ throw new ExecutorManagerException("Error fetching active flows", e);
+ }
+ }
+
+ @Override
public List<ExecutableFlow> fetchFlowHistory(int skip, int num) throws ExecutorManagerException {
QueryRunner runner = createQueryRunner();
@@ -816,13 +829,14 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
//private static String FETCH_ACTIVE_EXECUTABLE_FLOW = "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data FROM execution_flows ex INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";
private static String FETCH_ALL_EXECUTABLE_FLOW_HISTORY = "SELECT exec_id, enc_type, flow_data FROM execution_flows ORDER BY exec_id DESC LIMIT ?, ?";
private static String FETCH_EXECUTABLE_FLOW_HISTORY = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE project_id=? AND flow_id=? ORDER BY exec_id DESC LIMIT ?, ?";
+ private static String FETCH_EXECUTABLE_FLOW_BY_STATUS = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE project_id=? AND flow_id=? AND status=? ORDER BY exec_id DESC LIMIT ?, ?";
@Override
public List<ExecutableFlow> handle(ResultSet rs) throws SQLException {
if (!rs.next()) {
return Collections.<ExecutableFlow>emptyList();
}
-
+
List<ExecutableFlow> execFlows = new ArrayList<ExecutableFlow>();
do {
int id = rs.getInt(1);
diff --git a/src/java/azkaban/scheduler/ScheduleStatisticManager.java b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
new file mode 100644
index 0000000..28fc11c
--- /dev/null
+++ b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
@@ -0,0 +1,150 @@
+package azkaban.scheduler;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
+import azkaban.utils.JSONUtils;
+import azkaban.webapp.AzkabanWebServer;
+
+public class ScheduleStatisticManager {
+ private static HashMap<Integer, Object> cacheLock = new HashMap<Integer, Object>();
+ private static File cacheDirectory = new File("cache/schedule-statistics");
+ private static final int STAT_NUMBERS = 10;
+
+ public static Map<String, Object> getStatistics(int scheduleId, AzkabanWebServer server) {
+ Map<String, Object> data = loadCache(scheduleId);
+ if (data != null) {
+ return data;
+ }
+
+ // Calculate data and cache it
+ data = calculateStats(scheduleId, server);
+
+ saveCache(scheduleId, data);
+
+ return data;
+ }
+
+ private static Map<String, Object> calculateStats(int scheduleId, AzkabanWebServer server) {
+ Map<String, Object> data = new HashMap<String, Object>();
+ ExecutorManager executorManager = server.getExecutorManager();
+ ScheduleManager scheduleManager = server.getScheduleManager();
+ Schedule schedule = scheduleManager.getSchedule(scheduleId);
+
+ try {
+ List<ExecutableFlow> executables = executorManager.getExecutableFlows(schedule.getProjectId(), schedule.getFlowName(), 0, STAT_NUMBERS, Status.SUCCEEDED);
+
+ long average = 0;
+ long min = Integer.MAX_VALUE;
+ long max = 0;
+ if (executables.isEmpty()) {
+ average = 0;
+ min = 0;
+ max = 0;
+ }
+ else {
+ for (ExecutableFlow flow : executables) {
+ long time = flow.getEndTime() - flow.getStartTime();
+ average += time;
+ if (time < min) {
+ min = time;
+ }
+ if (time > max) {
+ max = time;
+ }
+ }
+ average /= executables.size();
+ }
+
+ data.put("average", average);
+ data.put("min", min);
+ data.put("max", max);
+ } catch (ExecutorManagerException e) {
+ e.printStackTrace();
+ }
+
+ return data;
+ }
+
+ public static void invalidateCache(int scheduleId) {
+ // This should be silent and not fail
+ try {
+ Object lock = getLock(scheduleId);
+ synchronized (lock) {
+ getCacheFile(scheduleId).delete();
+ }
+ unLock(scheduleId);
+ } catch (Exception e) {
+ }
+ }
+
+ private static void saveCache(int scheduleId, Map<String, Object> data) {
+ Object lock = getLock(scheduleId);
+ try {
+ synchronized (lock) {
+ File cache = getCacheFile(scheduleId);
+ cache.createNewFile();
+ OutputStream output = new FileOutputStream(cache);
+ JSONUtils.toJSON(data, output, false);
+ output.close();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ unLock(scheduleId);
+ }
+
+ private static Map<String, Object> loadCache(int scheduleId) {
+ Object lock = getLock(scheduleId);
+ try {
+ synchronized (lock) {
+ File cache = getCacheFile(scheduleId);
+ if (cache.exists() && cache.isFile()) {
+ Object dataObj = JSONUtils.parseJSONFromFile(cache);
+ if (dataObj instanceof Map<?, ?>) {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> data = (Map<String, Object>) dataObj;
+ return data;
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ unLock(scheduleId);
+ return null;
+ }
+
+ private static File getCacheFile(int scheduleId) {
+ cacheDirectory.mkdirs();
+ File file = new File(cacheDirectory, scheduleId + ".cache");
+ return file;
+ }
+
+ private static Object getLock(int scheduleId) {
+ Object lock = null;
+ synchronized (cacheLock) {
+ lock = cacheLock.get(scheduleId);
+ if (lock == null) {
+ lock = new Object();
+ cacheLock.put(scheduleId, lock);
+ }
+ }
+
+ return lock;
+ }
+
+ private static void unLock(int scheduleId) {
+ synchronized (cacheLock) {
+ cacheLock.remove(scheduleId);
+ }
+ }
+}
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 0092999..312b7da 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -53,6 +53,7 @@ import azkaban.project.ProjectManager;
import azkaban.scheduler.Schedule;
import azkaban.scheduler.ScheduleManager;
import azkaban.scheduler.ScheduleManagerException;
+import azkaban.scheduler.ScheduleStatisticManager;
import azkaban.sla.SLA;
import azkaban.sla.SLA.SlaAction;
import azkaban.sla.SLA.SlaRule;
@@ -373,6 +374,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
private void writeScheduleData(List<HashMap<String, Object>> output, Schedule schedule) {
+ Map<String, Object> stats = ScheduleStatisticManager.getStatistics(schedule.getScheduleId(), (AzkabanWebServer) getApplication());
HashMap<String, Object> data = new HashMap<String, Object>();
data.put("scheduleid", schedule.getScheduleId());
data.put("flowname", schedule.getFlowName());
@@ -381,13 +383,20 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
DateTime time = DateTime.now();
long period = 0;
- if(schedule.getPeriod() != null){
+ if (schedule.getPeriod() != null) {
period = time.plus(schedule.getPeriod()).getMillis() - time.getMillis();
}
data.put("period", period);
- data.put("length", 2 * 3600 * 1000);
+ int length = 3600 * 1000;
+ if (stats.get("average") != null && stats.get("average") instanceof Integer) {
+ length = (int) (Integer) stats.get("average");
+ if (length == 0) {
+ length = 3600 * 1000;
+ }
+ }
+ data.put("length", length);
data.put("history", false);
-
+ data.put("stats", stats);
output.add(data);
}
@@ -407,7 +416,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
int loadAll = getIntParam(req, "loadAll");
// Cache file
- File cache = new File("cache/" + startTime + ".cache");
+ File cache = new File("cache/schedule-history/" + startTime + ".cache");
cache.getParentFile().mkdirs();
if (useCache) {
@@ -453,7 +462,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
//Create cache file
- File cacheTemp = new File("cache/" + startTime + ".tmp");
+ File cacheTemp = new File("cache/schedule-history/" + startTime + ".tmp");
cacheTemp.createNewFile();
OutputStream cacheOutput = new FileOutputStream(cacheTemp);
src/web/js/azkaban.schedule.svg.js 6(+4 -2)
diff --git a/src/web/js/azkaban.schedule.svg.js b/src/web/js/azkaban.schedule.svg.js
index 5a840f9..e9e583c 100644
--- a/src/web/js/azkaban.schedule.svg.js
+++ b/src/web/js/azkaban.schedule.svg.js
@@ -432,10 +432,12 @@ $(function() {
{
var items = data.items;
+ console.log(data);
+
//Sort items by day
for(var i = 0; i < items.length; i++)
{
- items[i].length = hourMillisConst; //TODO: Remove this to get the actual length
+ //items[i].length = hourMillisConst; //TODO: Remove this to get the actual length
items[i].objs = new Array();
items[i].color = "#69F";
processItem(items[i], true);
@@ -448,7 +450,7 @@ $(function() {
$.ajax({
type: "GET",
url: requestURL,
- data: {"ajax": "loadHistory", "startTime": firstDay + deltaDay * dayMillisConst, "loadAll" : 1},
+ data: {"ajax": "loadHistory", "startTime": firstDay + deltaDay * dayMillisConst, "loadAll" : 0},
//dataType: "json",
success: function (data)
{
diff --git a/unit/java/azkaban/test/execapp/MockExecutorLoader.java b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
index 6510ae3..2b81f82 100644
--- a/unit/java/azkaban/test/execapp/MockExecutorLoader.java
+++ b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
@@ -11,6 +11,7 @@ import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionReference;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
import azkaban.utils.Props;
@@ -193,5 +194,11 @@ public class MockExecutorLoader implements ExecutorLoader {
return 0;
}
+ @Override
+ public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num, Status status) throws ExecutorManagerException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
\ No newline at end of file