azkaban-memoizeit
Changes
src/java/azkaban/utils/Utils.java 8(+8 -0)
src/java/azkaban/webapp/servlet/ScheduleServlet.java 151(+137 -14)
src/web/js/azkaban.schedule.svg.js 123(+97 -26)
Details
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index b449654..152a546 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -266,6 +266,7 @@ public class ExecutableFlow {
flowObj.put("executionPath", executionPath);
flowObj.put("flowId", flowId);
flowObj.put("projectId", projectId);
+
if(scheduleId >= 0) {
flowObj.put("scheduleId", scheduleId);
}
diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
index c0108e2..97581e9 100644
--- a/src/java/azkaban/executor/ExecutorLoader.java
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -35,6 +35,8 @@ public interface ExecutorLoader {
public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num) throws ExecutorManagerException;
+ public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num, Status status) throws ExecutorManagerException;
+
public List<ExecutableFlow> fetchFlowHistory(String projContain, String flowContains, String userNameContains, int status, long startData, long endData, int skip, int num) throws ExecutorManagerException;
public void addActiveExecutableReference(ExecutionReference ref) throws ExecutorManagerException;
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 07b81a8..527956d 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -40,6 +40,7 @@ import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import azkaban.project.Project;
+import azkaban.scheduler.ScheduleStatisticManager;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
@@ -661,6 +662,9 @@ public class ExecutorManager {
evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
// Add new finished
for (ExecutableFlow flow: finishedFlows) {
+ if(flow.getScheduleId() >= 0 && flow.getStatus() == Status.SUCCEEDED){
+ ScheduleStatisticManager.invalidateCache(flow.getScheduleId());
+ }
recentlyFinished.put(flow.getExecutionId(), flow);
}
@@ -942,7 +946,11 @@ public class ExecutorManager {
outputList.addAll(flows);
return executorLoader.fetchNumExecutableFlows(projectId, flowId);
}
-
+
+ public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId, int from, int length, Status status) throws ExecutorManagerException {
+ return executorLoader.fetchFlowHistory(projectId, flowId, from, length, status);
+ }
+
/*
* cleaner thread to clean up execution_logs, etc in DB. Runs every day.
*
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 33a9f76..3c7332d 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -203,6 +203,19 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
}
@Override
+ public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num, Status status) throws ExecutorManagerException {
+ QueryRunner runner = createQueryRunner();
+ FetchExecutableFlows flowHandler = new FetchExecutableFlows();
+
+ try {
+ List<ExecutableFlow> properties = runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_STATUS, flowHandler, projectId, flowId, status.getNumVal(), skip, num);
+ return properties;
+ } catch (SQLException e) {
+ throw new ExecutorManagerException("Error fetching active flows", e);
+ }
+ }
+
+ @Override
public List<ExecutableFlow> fetchFlowHistory(int skip, int num) throws ExecutorManagerException {
QueryRunner runner = createQueryRunner();
@@ -223,13 +236,13 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
ArrayList<Object> params = new ArrayList<Object>();
boolean first = true;
- if (projContain != null) {
+ if (projContain != null && !projContain.isEmpty()) {
query += " ef JOIN projects p ON ef.project_id = p.id WHERE name LIKE ?";
params.add('%'+projContain+'%');
first = false;
}
- if (flowContains != null) {
+ if (flowContains != null && !flowContains.isEmpty()) {
if (first) {
query += " WHERE ";
first = false;
@@ -242,7 +255,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
params.add('%'+flowContains+'%');
}
- if (userNameContains != null) {
+ if (userNameContains != null && !userNameContains.isEmpty()) {
if (first) {
query += " WHERE ";
first = false;
@@ -816,13 +829,14 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
//private static String FETCH_ACTIVE_EXECUTABLE_FLOW = "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data FROM execution_flows ex INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";
private static String FETCH_ALL_EXECUTABLE_FLOW_HISTORY = "SELECT exec_id, enc_type, flow_data FROM execution_flows ORDER BY exec_id DESC LIMIT ?, ?";
private static String FETCH_EXECUTABLE_FLOW_HISTORY = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE project_id=? AND flow_id=? ORDER BY exec_id DESC LIMIT ?, ?";
+ private static String FETCH_EXECUTABLE_FLOW_BY_STATUS = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE project_id=? AND flow_id=? AND status=? ORDER BY exec_id DESC LIMIT ?, ?";
@Override
public List<ExecutableFlow> handle(ResultSet rs) throws SQLException {
if (!rs.next()) {
return Collections.<ExecutableFlow>emptyList();
}
-
+
List<ExecutableFlow> execFlows = new ArrayList<ExecutableFlow>();
do {
int id = rs.getInt(1);
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 00c4f46..c60f143 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -270,15 +270,17 @@ public class ScheduleManager {
* @param flow
*/
public synchronized void insertSchedule(Schedule s) {
- boolean exist = scheduleIdentityPairMap.containsKey(s.getScheduleIdentityPair());
+ boolean exist = s.getScheduleId() != -1;
if(s.updateTime()) {
- internalSchedule(s);
try {
if(!exist) {
loader.insertSchedule(s);
+ internalSchedule(s);
}
- else
+ else{
loader.updateSchedule(s);
+ internalSchedule(s);
+ }
} catch (ScheduleManagerException e) {
// TODO Auto-generated catch block
e.printStackTrace();
@@ -407,6 +409,7 @@ public class ScheduleManager {
// Create ExecutableFlow
ExecutableFlow exflow = new ExecutableFlow(flow);
+ System.out.println("ScheduleManager: creating schedule: " +runningSched.getScheduleId());
exflow.setScheduleId(runningSched.getScheduleId());
exflow.setSubmitUser(runningSched.getSubmitUser());
exflow.addAllProxyUsers(project.getProxyUsers());
diff --git a/src/java/azkaban/scheduler/ScheduleStatisticManager.java b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
new file mode 100644
index 0000000..28fc11c
--- /dev/null
+++ b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
@@ -0,0 +1,150 @@
+package azkaban.scheduler;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
+import azkaban.utils.JSONUtils;
+import azkaban.webapp.AzkabanWebServer;
+
+public class ScheduleStatisticManager {
+ private static HashMap<Integer, Object> cacheLock = new HashMap<Integer, Object>();
+ private static File cacheDirectory = new File("cache/schedule-statistics");
+ private static final int STAT_NUMBERS = 10;
+
+ public static Map<String, Object> getStatistics(int scheduleId, AzkabanWebServer server) {
+ Map<String, Object> data = loadCache(scheduleId);
+ if (data != null) {
+ return data;
+ }
+
+ // Calculate data and cache it
+ data = calculateStats(scheduleId, server);
+
+ saveCache(scheduleId, data);
+
+ return data;
+ }
+
+ private static Map<String, Object> calculateStats(int scheduleId, AzkabanWebServer server) {
+ Map<String, Object> data = new HashMap<String, Object>();
+ ExecutorManager executorManager = server.getExecutorManager();
+ ScheduleManager scheduleManager = server.getScheduleManager();
+ Schedule schedule = scheduleManager.getSchedule(scheduleId);
+
+ try {
+ List<ExecutableFlow> executables = executorManager.getExecutableFlows(schedule.getProjectId(), schedule.getFlowName(), 0, STAT_NUMBERS, Status.SUCCEEDED);
+
+ long average = 0;
+ long min = Integer.MAX_VALUE;
+ long max = 0;
+ if (executables.isEmpty()) {
+ average = 0;
+ min = 0;
+ max = 0;
+ }
+ else {
+ for (ExecutableFlow flow : executables) {
+ long time = flow.getEndTime() - flow.getStartTime();
+ average += time;
+ if (time < min) {
+ min = time;
+ }
+ if (time > max) {
+ max = time;
+ }
+ }
+ average /= executables.size();
+ }
+
+ data.put("average", average);
+ data.put("min", min);
+ data.put("max", max);
+ } catch (ExecutorManagerException e) {
+ e.printStackTrace();
+ }
+
+ return data;
+ }
+
+ public static void invalidateCache(int scheduleId) {
+ // This should be silent and not fail
+ try {
+ Object lock = getLock(scheduleId);
+ synchronized (lock) {
+ getCacheFile(scheduleId).delete();
+ }
+ unLock(scheduleId);
+ } catch (Exception e) {
+ }
+ }
+
+ private static void saveCache(int scheduleId, Map<String, Object> data) {
+ Object lock = getLock(scheduleId);
+ try {
+ synchronized (lock) {
+ File cache = getCacheFile(scheduleId);
+ cache.createNewFile();
+ OutputStream output = new FileOutputStream(cache);
+ JSONUtils.toJSON(data, output, false);
+ output.close();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ unLock(scheduleId);
+ }
+
+ private static Map<String, Object> loadCache(int scheduleId) {
+ Object lock = getLock(scheduleId);
+ try {
+ synchronized (lock) {
+ File cache = getCacheFile(scheduleId);
+ if (cache.exists() && cache.isFile()) {
+ Object dataObj = JSONUtils.parseJSONFromFile(cache);
+ if (dataObj instanceof Map<?, ?>) {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> data = (Map<String, Object>) dataObj;
+ return data;
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ unLock(scheduleId);
+ return null;
+ }
+
+ private static File getCacheFile(int scheduleId) {
+ cacheDirectory.mkdirs();
+ File file = new File(cacheDirectory, scheduleId + ".cache");
+ return file;
+ }
+
+ private static Object getLock(int scheduleId) {
+ Object lock = null;
+ synchronized (cacheLock) {
+ lock = cacheLock.get(scheduleId);
+ if (lock == null) {
+ lock = new Object();
+ cacheLock.put(scheduleId, lock);
+ }
+ }
+
+ return lock;
+ }
+
+ private static void unLock(int scheduleId) {
+ synchronized (cacheLock) {
+ cacheLock.remove(scheduleId);
+ }
+ }
+}
diff --git a/src/java/azkaban/utils/SplitterOutputStream.java b/src/java/azkaban/utils/SplitterOutputStream.java
new file mode 100644
index 0000000..81821c6
--- /dev/null
+++ b/src/java/azkaban/utils/SplitterOutputStream.java
@@ -0,0 +1,53 @@
+package azkaban.utils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SplitterOutputStream extends OutputStream {
+ List<OutputStream> outputs;
+
+ public SplitterOutputStream(OutputStream... outputs) {
+ this.outputs = new ArrayList<OutputStream>(outputs.length);
+ for (OutputStream output : outputs) {
+ this.outputs.add(output);
+ }
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ for (OutputStream output : outputs) {
+ output.write(b);
+ }
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ for (OutputStream output : outputs) {
+ output.write(b);
+ }
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ for (OutputStream output : outputs) {
+ output.write(b, off, len);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ for (OutputStream output : outputs) {
+ output.flush();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ for (OutputStream output : outputs) {
+ output.close();
+ }
+ }
+
+}
src/java/azkaban/utils/Utils.java 8(+8 -0)
diff --git a/src/java/azkaban/utils/Utils.java b/src/java/azkaban/utils/Utils.java
index fb4419f..03b1bab 100644
--- a/src/java/azkaban/utils/Utils.java
+++ b/src/java/azkaban/utils/Utils.java
@@ -277,4 +277,12 @@ public class Utils {
Method method = clazz.getDeclaredMethod(methodName, argTypes);
return method.invoke(null, args);
}
+
+ public static void copyStream(InputStream input, OutputStream output) throws IOException {
+ byte[] buffer = new byte[1024];
+ int bytesRead;
+ while ((bytesRead = input.read(buffer)) != -1) {
+ output.write(buffer, 0, bytesRead);
+ }
+ }
}
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/HistoryServlet.java b/src/java/azkaban/webapp/servlet/HistoryServlet.java
index ac6fed0..aa6d2ad 100644
--- a/src/java/azkaban/webapp/servlet/HistoryServlet.java
+++ b/src/java/azkaban/webapp/servlet/HistoryServlet.java
@@ -120,10 +120,10 @@ public class HistoryServlet extends LoginAbstractAzkabanServlet {
}
List<ExecutableFlow> history = null;
if(hasParam(req, "advfilter")) {
- String projContain = getParam(req, "projcontain") == "" ? null : getParam(req, "projcontain");
- String flowContain = getParam(req, "flowcontain") == "" ? null : getParam(req, "flowcontain");
- String userContain = getParam(req, "usercontain") == "" ? null : getParam(req, "usercontain");
- int status = getIntParam(req, "status") == 0 ? 0 : getIntParam(req, "status");
+ String projContain = getParam(req, "projcontain");
+ String flowContain = getParam(req, "flowcontain");
+ String userContain = getParam(req, "usercontain");
+ int status = getIntParam(req, "status");
String begin = getParam(req, "begin");
long beginTime = begin == "" ? -1 : DateTimeFormat.forPattern("MM/dd/yyyy-HH:mm").parseDateTime(begin).getMillis();
String end = getParam(req, "end");
@@ -131,7 +131,6 @@ public class HistoryServlet extends LoginAbstractAzkabanServlet {
try {
history = executorManager.getExecutableFlows(projContain, flowContain, userContain, status, beginTime, endTime, (pageNum - 1)*pageSize, pageSize);
} catch (ExecutorManagerException e) {
- // TODO Auto-generated catch block
page.add("error", e.getMessage());
}
}
src/java/azkaban/webapp/servlet/ScheduleServlet.java 151(+137 -14)
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 50afa9b..4722257 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -16,7 +16,12 @@
package azkaban.webapp.servlet;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -36,25 +41,32 @@ import org.joda.time.Minutes;
import org.joda.time.ReadablePeriod;
import org.joda.time.format.DateTimeFormat;
+import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
import azkaban.flow.Node;
import azkaban.project.Project;
-import azkaban.project.ProjectManager;
import azkaban.project.ProjectLogEvent.EventType;
-import azkaban.user.Permission;
-import azkaban.user.User;
-import azkaban.user.Permission.Type;
-import azkaban.webapp.AzkabanWebServer;
-import azkaban.webapp.session.Session;
+import azkaban.project.ProjectManager;
import azkaban.scheduler.Schedule;
import azkaban.scheduler.ScheduleManager;
import azkaban.scheduler.ScheduleManagerException;
+import azkaban.scheduler.ScheduleStatisticManager;
import azkaban.sla.SLA;
-import azkaban.sla.SLA.SlaRule;
import azkaban.sla.SLA.SlaAction;
+import azkaban.sla.SLA.SlaRule;
import azkaban.sla.SLA.SlaSetting;
import azkaban.sla.SlaOptions;
+import azkaban.user.Permission;
+import azkaban.user.Permission.Type;
+import azkaban.user.User;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.SplitterOutputStream;
+import azkaban.utils.Utils;
+import azkaban.webapp.AzkabanWebServer;
+import azkaban.webapp.session.Session;
public class ScheduleServlet extends LoginAbstractAzkabanServlet {
private static final long serialVersionUID = 1L;
@@ -97,6 +109,10 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
else if(ajaxName.equals("loadFlow")) {
ajaxLoadFlows(req, ret, session.getUser());
}
+ else if(ajaxName.equals("loadHistory")) {
+ ajaxLoadHistory(req, resp, session.getUser());
+ ret = null;
+ }
else if(ajaxName.equals("scheduleFlow")) {
ajaxScheduleFlow(req, ret, session.getUser());
}
@@ -349,7 +365,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
if (schedules.size() <= 0)
return;
- List<HashMap<String, String>> output = new ArrayList<HashMap<String, String>>();
+ List<HashMap<String, Object>> output = new ArrayList<HashMap<String, Object>>();
ret.put("items", output);
for (Schedule schedule : schedules) {
@@ -357,19 +373,126 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
}
- private void writeScheduleData(List<HashMap<String, String>> output, Schedule schedule) {
- HashMap<String, String> data = new HashMap<String, String>();
+ private void writeScheduleData(List<HashMap<String, Object>> output, Schedule schedule) {
+ Map<String, Object> stats = ScheduleStatisticManager.getStatistics(schedule.getScheduleId(), (AzkabanWebServer) getApplication());
+ HashMap<String, Object> data = new HashMap<String, Object>();
+ data.put("scheduleid", schedule.getScheduleId());
data.put("flowname", schedule.getFlowName());
data.put("projectname", schedule.getProjectName());
- data.put("time", Long.toString(schedule.getFirstSchedTime()));
+ data.put("time", schedule.getFirstSchedTime());
DateTime time = DateTime.now();
long period = 0;
- if(schedule.getPeriod() != null){
+ if (schedule.getPeriod() != null) {
period = time.plus(schedule.getPeriod()).getMillis() - time.getMillis();
}
- data.put("period", Long.toString(period));
- data.put("length", Long.toString(2 * 3600 * 1000));
+ data.put("period", period);
+ int length = 3600 * 1000;
+ if (stats.get("average") != null && stats.get("average") instanceof Integer) {
+ length = (int) (Integer) stats.get("average");
+ if (length == 0) {
+ length = 3600 * 1000;
+ }
+ }
+ data.put("length", length);
+ data.put("history", false);
+ data.put("stats", stats);
+ output.add(data);
+ }
+
+ private void ajaxLoadHistory(HttpServletRequest req, HttpServletResponse resp, User user) throws ServletException, IOException {
+ resp.setContentType(JSON_MIME_TYPE);
+ long today = DateTime.now().withTime(0, 0, 0, 0).getMillis();
+ long startTime = getLongParam(req, "startTime");
+ DateTime start = new DateTime(startTime);
+ // Ensure start time is 12:00 AM
+ startTime = start.withTime(0, 0, 0, 0).getMillis();
+ boolean useCache = false;
+ if (startTime < today) {
+ useCache = true;
+ }
+ long endTime = startTime + 24 * 3600 * 1000;
+ // long endTime = getLongParam(req, "endTime");
+ int loadAll = getIntParam(req, "loadAll");
+
+ // Cache file
+ File cache = new File("cache/schedule-history/" + startTime + ".cache");
+ cache.getParentFile().mkdirs();
+
+ if (useCache) {
+ // Determine if cache exists
+ boolean cacheExists = false;
+ synchronized (this) {
+ cacheExists = cache.exists() && cache.isFile();
+ }
+ if (cacheExists) {
+ // Send the cache instead
+ InputStream cacheInput = new FileInputStream(cache);
+ Utils.copyStream(cacheInput, resp.getOutputStream());
+ // System.out.println("Using cache copy for " + start);
+ return;
+ }
+ }
+
+ // Load data if not cached
+ List<ExecutableFlow> history = null;
+ try {
+ AzkabanWebServer server = (AzkabanWebServer) getApplication();
+ ExecutorManager executorManager = server.getExecutorManager();
+ history = executorManager.getExecutableFlows(null, null, null, 0, startTime, endTime, -1, -1);
+ } catch (ExecutorManagerException e) {
+ // Return empty should suffice
+ }
+ HashMap<String, Object> ret = new HashMap<String, Object>();
+ List<HashMap<String, Object>> output = new ArrayList<HashMap<String, Object>>();
+ ret.put("items", output);
+ for (ExecutableFlow historyItem : history) {
+ // Check if it is an scheduled execution
+ if (historyItem.getScheduleId() >= 0 || loadAll != 0) {
+ writeHistoryData(output, historyItem);
+ }
+ }
+
+ // Make sure we're ready to cache it, otherwise output and return
+ synchronized (this) {
+ if (!useCache || cache.exists()) {
+ JSONUtils.toJSON(ret, resp.getOutputStream(), false);
+ return;
+ }
+ }
+
+ //Create cache file
+ File cacheTemp = new File("cache/schedule-history/" + startTime + ".tmp");
+ cacheTemp.createNewFile();
+ OutputStream cacheOutput = new FileOutputStream(cacheTemp);
+
+ // Write to both the cache file and web output
+ JSONUtils.toJSON(ret, new SplitterOutputStream(cacheOutput, resp.getOutputStream()), false);
+ // System.out.println("Writing cache file for " + start);
+ // JSONUtils.toJSON(ret, new JSONCompressorOutputStream(resp.getOutputStream()), false);
+
+ //Move cache file
+ synchronized (this) {
+ cacheTemp.renameTo(cache);
+ }
+ }
+
+ private void writeHistoryData(List<HashMap<String, Object>> output, ExecutableFlow history) {
+ HashMap<String, Object> data = new HashMap<String, Object>();
+
+ data.put("scheduleid", history.getScheduleId());
+ Project project = projectManager.getProject(history.getProjectId());
+ data.put("flowname", history.getFlowId());
+ data.put("projectname", project.getName());
+ data.put("time", history.getStartTime());
+ data.put("period", "0");
+ long endTime = history.getEndTime();
+ if(endTime == -1){
+ endTime = System.currentTimeMillis();
+ }
+ data.put("length", endTime - history.getStartTime());
+ data.put("history", true);
+ data.put("status", history.getStatus().getNumVal());
output.add(data);
}
src/web/js/azkaban.schedule.svg.js 123(+97 -26)
diff --git a/src/web/js/azkaban.schedule.svg.js b/src/web/js/azkaban.schedule.svg.js
index d272251..e9e583c 100644
--- a/src/web/js/azkaban.schedule.svg.js
+++ b/src/web/js/azkaban.schedule.svg.js
@@ -123,21 +123,26 @@ $(function() {
}
}
+ var gDayViewOuterGroup = svg.group(gMain);
+ var gDayView = svg.group(gDayViewOuterGroup, {transform: "translate(" + (border + timeWidth) + "," + header + ")"});
if(isThisWeek != -1)
{
var date = new Date(firstDay + dayMillisConst * isThisWeek);
var day = date.getDate();
var gDay = svg.group(gMain, {transform: "translate(" + (border + timeWidth + isThisWeek * dayWidth) + "," + header + ")"});
svg.rect(gDay, 0, -header, dayWidth, 24 * lineHeight + header, {fill : "none", stroke : "#06F"});
+ var lineY = Math.floor(today.getHours() * lineHeight + today.getMinutes() * lineHeight / 60);
+ svg.line(gDay, 0, lineY, dayWidth, lineY, {fill : "none", stroke : "#06F", strokeWidth : 4});
}
- var gDayView = svg.group(gMain, {transform: "translate(" + (border + timeWidth) + "," + header + ")"});
//A list of all items
var itemByDay = new Array();
for(var deltaDay = 0; deltaDay < numDays; deltaDay++) {
itemByDay[deltaDay] = new Array();
}
+ var itemByScheduleIdMap = {};
+
function filterApplies(item) {
for(var i = 0; i < filterProject.length; i++) {
if(item.projectname == filterProject[i].projectname) {
@@ -156,7 +161,7 @@ $(function() {
function renderDays() {
//Clear items inside the day view
svg.remove(gDayView);
- gDayView = svg.group(gMain, {transform: "translate(" + (border + timeWidth) + "," + header + ")"});
+ gDayView = svg.group(gDayViewOuterGroup, {transform: "translate(" + (border + timeWidth) + "," + header + ")"});
//Add day groups
for(var deltaDay = 0; deltaDay < numDays; deltaDay++) {
@@ -214,7 +219,11 @@ $(function() {
var startTime = new Date(item.time);
var startY = Math.floor(startTime.getHours() * lineHeight + startTime.getMinutes() * lineHeight / 60);
var endTime = new Date(item.time + item.length );
- var endY = Math.floor(startY + (item.length * lineHeight) / hourMillisConst);
+ var endY = Math.ceil(startY + (item.length * lineHeight) / hourMillisConst);
+ var deltaY = Math.ceil(endY - startY);
+ if(deltaY < 5){
+ deltaY = 5;
+ }
//var anchor = svg.a(gColumn);
var itemUrl = contextURL + "/manager?project=" + item.projectname + "&flow=" + item.flowname;
var gItem = svg.link(gColumn, itemUrl, {transform: "translate(0," + startY + ")"});
@@ -231,7 +240,7 @@ $(function() {
gItem.addEventListener('mouseout', handleMouseOut);
//$(gItem).attr("style","color:red");
- var rect = svg.rect(gItem, 0, 0, Math.ceil(dayWidth / columns.length), Math.floor(endY - startY), 0, 0, {fill : "#7E7", stroke : "#444", strokeWidth : 1});
+ var rect = svg.rect(gItem, 0, 0, Math.ceil(dayWidth / columns.length), deltaY, 0, 0, {fill : item.item.color, stroke : "#444", strokeWidth : 1});
item.rect = rect;
//Draw text
@@ -241,25 +250,29 @@ $(function() {
}
}
- function processItem(item)
+ function processItem(item, scheduled)
{
var firstTime = item.time;
var startTime = firstDay;
var endTime = firstDay + numDays * dayMillisConst;
var period = item.period;
+ var restrictedStartTime = Math.max(firstDay, today.valueOf());
+ if(!scheduled){
+ restrictedStartTime = firstDay;
+ }
// Shift time until we're past the start time
if (period > 0) {
// Calculate next execution time efficiently
// Take into account items that ends in the date specified, but does not start on that date
- var periods = Math.floor((startTime - (firstTime + item.length)) / period);
+ var periods = Math.floor((restrictedStartTime - (firstTime)) / period);
//Make sure we don't subtract
if(periods < 0){
periods = 0;
}
firstTime += period * periods;
// Increment in case we haven't arrived yet. This will apply to most of the cases
- while (firstTime + item.length < startTime) {
+ while (firstTime < restrictedStartTime) {
firstTime += period;
}
}
@@ -267,18 +280,18 @@ $(function() {
// Bad or no period
if (period <= 0) {
// Single instance case
- if (firstTime >= startTime && firstTime < endTime) {
- addItem({flowname : item.flowname, projectname: item.projectname, time: firstTime, length: item.length, item: item});
+ if (firstTime >= restrictedStartTime && firstTime < endTime) {
+ addItem({scheduleid: item.scheduleid, flowname : item.flowname, projectname: item.projectname, time: firstTime, length: item.length, item: item});
}
}
else {
if(period <= hourMillisConst) {
- addItem({flowname : item.flowname, projectname: item.projectname, time: firstTime, length: endTime - firstTime, item: item});
+ addItem({scheduleid: item.scheduleid, flowname : item.flowname, projectname: item.projectname, time: firstTime, length: endTime - firstTime, item: item});
}
else{
// Repetitive schedule, firstTime is assumed to be after startTime
while (firstTime < endTime) {
- addItem({flowname : item.flowname, projectname: item.projectname, time: firstTime, length: item.length, item: item});
+ addItem({scheduleid: item.scheduleid, flowname : item.flowname, projectname: item.projectname, time: firstTime, length: item.length, item: item});
firstTime += period;
}
}
@@ -305,7 +318,7 @@ $(function() {
tempLength = dayMillisConst;
}
- var item2 = {time: nextMorning, length: tempLength, projectname: obj.projectname, flowname: obj.flowname, item: obj.item};
+ var item2 = {scheduleid: obj.scheduleid, time: nextMorning, length: tempLength, projectname: obj.projectname, flowname: obj.flowname, item: obj.item};
addItem(item2);
excess -= tempLength;
nextMorning += dayMillisConst;
@@ -318,7 +331,12 @@ $(function() {
{
//Add the item to the rendering list
itemByDay[index].push(obj);
- obj.item.objs.push(obj);
+ //obj.item.objs.push(obj);
+
+ if(!itemByScheduleIdMap[obj.scheduleid]){
+ itemByScheduleIdMap[obj.scheduleid] = new Array();
+ }
+ itemByScheduleIdMap[obj.scheduleid].push(obj);
}
}
@@ -350,27 +368,54 @@ $(function() {
"\"" + obj.flowname + "\" from \"" + obj.projectname + "\"",
"Repeat: " + formatReadablePeriod(obj.item.period)
];
+
+ if(obj.item.period == 0){
+ text[1] = "";
+ if(obj.item.history == true) {
+ if(obj.item.status == 50){
+ text[1] = "SUCCEEDED";
+ }
+ else if(obj.item.status == 60){
+ text[1] = "KILLED";
+ }
+ else if(obj.item.status == 70){
+ text[1] = "FAILED";
+ }
+ else if(obj.item.status == 80){
+ text[1] = "FAILED_FINISHING";
+ }
+ else if(obj.item.status == 90){
+ text[1] = "SKIPPED";
+ }
+ }
+ }
var textLength = Math.max(measureText(svg, text[0], {fontSize: "13"}), measureText(svg, text[1], {fontSize: "13"}));
var rect = svg.rect(tooltip, 0, -40, textLength + 4, 40, {fill : "#FFF", stroke : "none"});
svg.text(tooltip, 2, -25, text[0], {fontSize: "13", fill : "#000", stroke : "none"});
svg.text(tooltip, 2, -5, text[1], {fontSize: "13", fill : "#000", stroke : "none"});
- //Item highlight effect
- for(var i = 0; i < obj.item.objs.length; i++) {
- var obj2 = obj.item.objs[i];
- $(obj2.rect).attr("fill", "#F00");
- }
-
//Store tooltip
$(this).data("tooltip", tooltip);
+
+ if(itemByScheduleIdMap[obj.scheduleid]){
+ //Item highlight effect
+ var arry = itemByScheduleIdMap[obj.scheduleid];
+ for(var i = 0; i < arry.length; i++) {
+ $(arry[i].rect).attr("fill", "#FF0");
+ }
+ }
}
function handleMouseOut(event) {
//Item highlight effect
var obj = $(this).data("item");
- for(var i = 0; i < obj.item.objs.length; i++) {
- var obj2 = obj.item.objs[i];
- $(obj2.rect).attr("fill", "#7E7");
+ //Item highlight effect
+ if(itemByScheduleIdMap[obj.scheduleid]){
+ var arry = itemByScheduleIdMap[obj.scheduleid];
+ for(var i = 0; i < arry.length; i++) {
+ var obj2 = obj.item.objs[i];
+ $(arry[i].rect).attr("fill", arry[i].item.color);
+ }
}
//Clear the fade interval
$($(this).data("tooltip")).fadeOut(250, function(){ svg.remove(this); });
@@ -387,19 +432,45 @@ $(function() {
{
var items = data.items;
+ console.log(data);
+
//Sort items by day
for(var i = 0; i < items.length; i++)
{
- items[i].length = hourMillisConst; //TODO: parseInt(items[i].length);
- items[i].time = parseInt(items[i].time);
- items[i].period = parseInt(items[i].period);
+ //items[i].length = hourMillisConst; //TODO: Remove this to get the actual length
items[i].objs = new Array();
- processItem(items[i]);
+ items[i].color = "#69F";
+ processItem(items[i], true);
}
//Trigger a re-rendering of all the data
renderDays();
}
});
+ for(var deltaDay = 0; deltaDay < numDays; deltaDay++) {
+ $.ajax({
+ type: "GET",
+ url: requestURL,
+ data: {"ajax": "loadHistory", "startTime": firstDay + deltaDay * dayMillisConst, "loadAll" : 0},
+ //dataType: "json",
+ success: function (data)
+ {
+ var items = data.items;
+
+ //Sort items by day
+ for(var i = 0; i < items.length; i++)
+ {
+ //if(items[i].length < 5 * 60 * 1000) items[i].length = 5 * 60 * 1000;
+ items[i].objs = new Array();
+ items[i].color = "#7E7";
+ if(items[i].status == 60 || items[i].status == 70 || items[i].status == 80)
+ items[i].color = "#E77";
+ processItem(items[i], false);
+ }
+ //Trigger a re-rendering of all the data
+ renderDays();
+ }
+ });
+ }
}
}, settings : {
"xmlns" : "http://www.w3.org/2000/svg",
diff --git a/unit/java/azkaban/test/execapp/MockExecutorLoader.java b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
index 6510ae3..2b81f82 100644
--- a/unit/java/azkaban/test/execapp/MockExecutorLoader.java
+++ b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
@@ -11,6 +11,7 @@ import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionReference;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
import azkaban.utils.Props;
@@ -193,5 +194,11 @@ public class MockExecutorLoader implements ExecutorLoader {
return 0;
}
+ @Override
+ public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num, Status status) throws ExecutorManagerException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
\ No newline at end of file