azkaban-memoizeit

Merge pull request #21 from lishd/master Added history viewing

5/23/2013 5:56:24 PM

Details

diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
index c0108e2..97581e9 100644
--- a/src/java/azkaban/executor/ExecutorLoader.java
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -35,6 +35,8 @@ public interface ExecutorLoader {
 
 	public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num) throws ExecutorManagerException;
 
+	public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num, Status status) throws ExecutorManagerException;
+
 	public List<ExecutableFlow> fetchFlowHistory(String projContain, String flowContains, String userNameContains, int status, long startData, long endData, int skip, int num) throws ExecutorManagerException;
 
 	public void addActiveExecutableReference(ExecutionReference ref) throws ExecutorManagerException;
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 07b81a8..527956d 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -40,6 +40,7 @@ import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
 import azkaban.project.Project;
+import azkaban.scheduler.ScheduleStatisticManager;
 import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.JSONUtils;
@@ -661,6 +662,9 @@ public class ExecutorManager {
 						evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
 						// Add new finished
 						for (ExecutableFlow flow: finishedFlows) {
+							if(flow.getScheduleId() >= 0 && flow.getStatus() == Status.SUCCEEDED){
+								ScheduleStatisticManager.invalidateCache(flow.getScheduleId());
+							}
 							recentlyFinished.put(flow.getExecutionId(), flow);
 						}
 						
@@ -942,7 +946,11 @@ public class ExecutorManager {
 		outputList.addAll(flows);
 		return executorLoader.fetchNumExecutableFlows(projectId, flowId);
 	}
-	
+
+	public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId, int from, int length, Status status) throws ExecutorManagerException {
+		return executorLoader.fetchFlowHistory(projectId, flowId, from, length, status);
+	}
+
 	/* 
 	 * cleaner thread to clean up execution_logs, etc in DB. Runs every day.
 	 * 
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 5c907a9..3c7332d 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -203,6 +203,19 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
 	}
 	
 	@Override
+	public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num, Status status) throws ExecutorManagerException {
+		QueryRunner runner = createQueryRunner();
+		FetchExecutableFlows flowHandler = new FetchExecutableFlows();
+
+		try {
+			List<ExecutableFlow> properties = runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_STATUS, flowHandler, projectId, flowId, status.getNumVal(), skip, num);
+			return properties;
+		} catch (SQLException e) {
+			throw new ExecutorManagerException("Error fetching active flows", e);
+		}
+	}
+	
+	@Override
 	public List<ExecutableFlow> fetchFlowHistory(int skip, int num) throws ExecutorManagerException {
 		QueryRunner runner = createQueryRunner();
 
@@ -816,13 +829,14 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
 		//private static String FETCH_ACTIVE_EXECUTABLE_FLOW = "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data FROM execution_flows ex INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";
 		private static String FETCH_ALL_EXECUTABLE_FLOW_HISTORY = "SELECT exec_id, enc_type, flow_data FROM execution_flows ORDER BY exec_id DESC LIMIT ?, ?";
 		private static String FETCH_EXECUTABLE_FLOW_HISTORY = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE project_id=? AND flow_id=? ORDER BY exec_id DESC LIMIT ?, ?";
+		private static String FETCH_EXECUTABLE_FLOW_BY_STATUS = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE project_id=? AND flow_id=? AND status=? ORDER BY exec_id DESC LIMIT ?, ?";
 		
 		@Override
 		public List<ExecutableFlow> handle(ResultSet rs) throws SQLException {
 			if (!rs.next()) {
 				return Collections.<ExecutableFlow>emptyList();
 			}
-
+			
 			List<ExecutableFlow> execFlows = new ArrayList<ExecutableFlow>();
 			do {
 				int id = rs.getInt(1);
diff --git a/src/java/azkaban/scheduler/ScheduleStatisticManager.java b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
new file mode 100644
index 0000000..28fc11c
--- /dev/null
+++ b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
@@ -0,0 +1,150 @@
+package azkaban.scheduler;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
+import azkaban.utils.JSONUtils;
+import azkaban.webapp.AzkabanWebServer;
+
+public class ScheduleStatisticManager {
+	private static HashMap<Integer, Object> cacheLock = new HashMap<Integer, Object>();
+	private static File cacheDirectory = new File("cache/schedule-statistics");
+	private static final int STAT_NUMBERS = 10;
+
+	public static Map<String, Object> getStatistics(int scheduleId, AzkabanWebServer server) {
+		Map<String, Object> data = loadCache(scheduleId);
+		if (data != null) {
+			return data;
+		}
+
+		// Calculate data and cache it
+		data = calculateStats(scheduleId, server);
+
+		saveCache(scheduleId, data);
+
+		return data;
+	}
+
+	private static Map<String, Object> calculateStats(int scheduleId, AzkabanWebServer server) {
+		Map<String, Object> data = new HashMap<String, Object>();
+		ExecutorManager executorManager = server.getExecutorManager();
+		ScheduleManager scheduleManager = server.getScheduleManager();
+		Schedule schedule = scheduleManager.getSchedule(scheduleId);
+
+		try {
+			List<ExecutableFlow> executables = executorManager.getExecutableFlows(schedule.getProjectId(), schedule.getFlowName(), 0, STAT_NUMBERS, Status.SUCCEEDED);
+
+			long average = 0;
+			long min = Integer.MAX_VALUE;
+			long max = 0;
+			if (executables.isEmpty()) {
+				average = 0;
+				min = 0;
+				max = 0;
+			}
+			else {
+				for (ExecutableFlow flow : executables) {
+					long time = flow.getEndTime() - flow.getStartTime();
+					average += time;
+					if (time < min) {
+						min = time;
+					}
+					if (time > max) {
+						max = time;
+					}
+				}
+				average /= executables.size();
+			}
+
+			data.put("average", average);
+			data.put("min", min);
+			data.put("max", max);
+		} catch (ExecutorManagerException e) {
+			e.printStackTrace();
+		}
+
+		return data;
+	}
+
+	public static void invalidateCache(int scheduleId) {
+		// This should be silent and not fail
+		try {
+			Object lock = getLock(scheduleId);
+			synchronized (lock) {
+				getCacheFile(scheduleId).delete();
+			}
+			unLock(scheduleId);
+		} catch (Exception e) {
+		}
+	}
+
+	private static void saveCache(int scheduleId, Map<String, Object> data) {
+		Object lock = getLock(scheduleId);
+		try {
+			synchronized (lock) {
+				File cache = getCacheFile(scheduleId);
+				cache.createNewFile();
+				OutputStream output = new FileOutputStream(cache);
+				JSONUtils.toJSON(data, output, false);
+				output.close();
+			}
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+		unLock(scheduleId);
+	}
+
+	private static Map<String, Object> loadCache(int scheduleId) {
+		Object lock = getLock(scheduleId);
+		try {
+			synchronized (lock) {
+				File cache = getCacheFile(scheduleId);
+				if (cache.exists() && cache.isFile()) {
+					Object dataObj = JSONUtils.parseJSONFromFile(cache);
+					if (dataObj instanceof Map<?, ?>) {
+						@SuppressWarnings("unchecked")
+						Map<String, Object> data = (Map<String, Object>) dataObj;
+						return data;
+					}
+				}
+			}
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+		unLock(scheduleId);
+		return null;
+	}
+
+	private static File getCacheFile(int scheduleId) {
+		cacheDirectory.mkdirs();
+		File file = new File(cacheDirectory, scheduleId + ".cache");
+		return file;
+	}
+
+	private static Object getLock(int scheduleId) {
+		Object lock = null;
+		synchronized (cacheLock) {
+			lock = cacheLock.get(scheduleId);
+			if (lock == null) {
+				lock = new Object();
+				cacheLock.put(scheduleId, lock);
+			}
+		}
+
+		return lock;
+	}
+
+	private static void unLock(int scheduleId) {
+		synchronized (cacheLock) {
+			cacheLock.remove(scheduleId);
+		}
+	}
+}
diff --git a/src/java/azkaban/utils/SplitterOutputStream.java b/src/java/azkaban/utils/SplitterOutputStream.java
new file mode 100644
index 0000000..81821c6
--- /dev/null
+++ b/src/java/azkaban/utils/SplitterOutputStream.java
@@ -0,0 +1,53 @@
+package azkaban.utils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SplitterOutputStream extends OutputStream {
+	List<OutputStream> outputs;
+
+	public SplitterOutputStream(OutputStream... outputs) {
+		this.outputs = new ArrayList<OutputStream>(outputs.length);
+		for (OutputStream output : outputs) {
+			this.outputs.add(output);
+		}
+	}
+
+	@Override
+	public void write(int b) throws IOException {
+		for (OutputStream output : outputs) {
+			output.write(b);
+		}
+	}
+
+	@Override
+	public void write(byte[] b) throws IOException {
+		for (OutputStream output : outputs) {
+			output.write(b);
+		}
+	}
+
+	@Override
+	public void write(byte[] b, int off, int len) throws IOException {
+		for (OutputStream output : outputs) {
+			output.write(b, off, len);
+		}
+	}
+
+	@Override
+	public void flush() throws IOException {
+		for (OutputStream output : outputs) {
+			output.flush();
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		for (OutputStream output : outputs) {
+			output.close();
+		}
+	}
+
+}
diff --git a/src/java/azkaban/utils/Utils.java b/src/java/azkaban/utils/Utils.java
index fb4419f..03b1bab 100644
--- a/src/java/azkaban/utils/Utils.java
+++ b/src/java/azkaban/utils/Utils.java
@@ -277,4 +277,12 @@ public class Utils {
 		Method method = clazz.getDeclaredMethod(methodName, argTypes);
 		return method.invoke(null, args);
 	}
+	
+	public static void copyStream(InputStream input, OutputStream output) throws IOException {
+		byte[] buffer = new byte[1024];
+		int bytesRead;
+		while ((bytesRead = input.read(buffer)) != -1) {
+			output.write(buffer, 0, bytesRead);
+		}
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 50afa9b..4722257 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -16,7 +16,12 @@
 
 package azkaban.webapp.servlet;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -36,25 +41,32 @@ import org.joda.time.Minutes;
 import org.joda.time.ReadablePeriod;
 import org.joda.time.format.DateTimeFormat;
 
+import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
 import azkaban.flow.Flow;
 import azkaban.flow.Node;
 import azkaban.project.Project;
-import azkaban.project.ProjectManager;
 import azkaban.project.ProjectLogEvent.EventType;
-import azkaban.user.Permission;
-import azkaban.user.User;
-import azkaban.user.Permission.Type;
-import azkaban.webapp.AzkabanWebServer;
-import azkaban.webapp.session.Session;
+import azkaban.project.ProjectManager;
 import azkaban.scheduler.Schedule;
 import azkaban.scheduler.ScheduleManager;
 import azkaban.scheduler.ScheduleManagerException;
+import azkaban.scheduler.ScheduleStatisticManager;
 import azkaban.sla.SLA;
-import azkaban.sla.SLA.SlaRule;
 import azkaban.sla.SLA.SlaAction;
+import azkaban.sla.SLA.SlaRule;
 import azkaban.sla.SLA.SlaSetting;
 import azkaban.sla.SlaOptions;
+import azkaban.user.Permission;
+import azkaban.user.Permission.Type;
+import azkaban.user.User;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.SplitterOutputStream;
+import azkaban.utils.Utils;
+import azkaban.webapp.AzkabanWebServer;
+import azkaban.webapp.session.Session;
 
 public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 	private static final long serialVersionUID = 1L;
@@ -97,6 +109,10 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		else if(ajaxName.equals("loadFlow")) {
 			ajaxLoadFlows(req, ret, session.getUser());
 		}
+		else if(ajaxName.equals("loadHistory")) {
+			ajaxLoadHistory(req, resp, session.getUser());
+			ret = null;
+		}
 		else if(ajaxName.equals("scheduleFlow")) {
 			ajaxScheduleFlow(req, ret, session.getUser());
 		}
@@ -349,7 +365,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		if (schedules.size() <= 0)
 			return;
 
-		List<HashMap<String, String>> output = new ArrayList<HashMap<String, String>>();
+		List<HashMap<String, Object>> output = new ArrayList<HashMap<String, Object>>();
 		ret.put("items", output);
 
 		for (Schedule schedule : schedules) {
@@ -357,19 +373,126 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		}
 	}
 
-	private void writeScheduleData(List<HashMap<String, String>> output, Schedule schedule) {
-		HashMap<String, String> data = new HashMap<String, String>();
+	private void writeScheduleData(List<HashMap<String, Object>> output, Schedule schedule) {
+		Map<String, Object> stats = ScheduleStatisticManager.getStatistics(schedule.getScheduleId(), (AzkabanWebServer) getApplication());
+		HashMap<String, Object> data = new HashMap<String, Object>();
+		data.put("scheduleid", schedule.getScheduleId());
 		data.put("flowname", schedule.getFlowName());
 		data.put("projectname", schedule.getProjectName());
-		data.put("time", Long.toString(schedule.getFirstSchedTime()));
+		data.put("time", schedule.getFirstSchedTime());
 
 		DateTime time = DateTime.now();
 		long period = 0;
-		if(schedule.getPeriod() != null){
+		if (schedule.getPeriod() != null) {
 			period = time.plus(schedule.getPeriod()).getMillis() - time.getMillis();
 		}
-		data.put("period", Long.toString(period));
-		data.put("length", Long.toString(2 * 3600 * 1000));
+		data.put("period", period);
+		int length = 3600 * 1000;
+		if (stats.get("average") != null && stats.get("average") instanceof Integer) {
+			length = (int) (Integer) stats.get("average");
+			if (length == 0) {
+				length = 3600 * 1000;
+			}
+		}
+		data.put("length", length);
+		data.put("history", false);
+		data.put("stats", stats);
+		output.add(data);
+	}
+	
+	private void ajaxLoadHistory(HttpServletRequest req, HttpServletResponse resp, User user) throws ServletException, IOException {
+		resp.setContentType(JSON_MIME_TYPE);
+		long today = DateTime.now().withTime(0, 0, 0, 0).getMillis();
+		long startTime = getLongParam(req, "startTime");
+		DateTime start = new DateTime(startTime);
+		// Ensure start time is 12:00 AM
+		startTime = start.withTime(0, 0, 0, 0).getMillis();
+		boolean useCache = false;
+		if (startTime < today) {
+			useCache = true;
+		}
+		long endTime = startTime + 24 * 3600 * 1000;
+		// long endTime = getLongParam(req, "endTime");
+		int loadAll = getIntParam(req, "loadAll");
+
+		// Cache file
+		File cache = new File("cache/schedule-history/" + startTime + ".cache");
+		cache.getParentFile().mkdirs();
+
+		if (useCache) {
+			// Determine if cache exists
+			boolean cacheExists = false;
+			synchronized (this) {
+				cacheExists = cache.exists() && cache.isFile();
+			}
+			if (cacheExists) {
+				// Send the cache instead
+				InputStream cacheInput = new FileInputStream(cache);
+				Utils.copyStream(cacheInput, resp.getOutputStream());
+				// System.out.println("Using cache copy for " + start);
+				return;
+			}
+		}
+
+		// Load data if not cached
+		List<ExecutableFlow> history = null;
+		try {
+			AzkabanWebServer server = (AzkabanWebServer) getApplication();
+			ExecutorManager executorManager = server.getExecutorManager();
+			history = executorManager.getExecutableFlows(null, null, null, 0, startTime, endTime, -1, -1);
+		} catch (ExecutorManagerException e) {
+			// Return empty should suffice
+		}
+		HashMap<String, Object> ret = new HashMap<String, Object>();
+		List<HashMap<String, Object>> output = new ArrayList<HashMap<String, Object>>();
+		ret.put("items", output);
+		for (ExecutableFlow historyItem : history) {
+			// Check if it is an scheduled execution
+			if (historyItem.getScheduleId() >= 0 || loadAll != 0) {
+				writeHistoryData(output, historyItem);
+			}
+		}
+
+		// Make sure we're ready to cache it, otherwise output and return
+		synchronized (this) {
+			if (!useCache || cache.exists()) {
+				JSONUtils.toJSON(ret, resp.getOutputStream(), false);
+				return;
+			}
+		}
+		
+		//Create cache file
+		File cacheTemp = new File("cache/schedule-history/" + startTime + ".tmp");
+		cacheTemp.createNewFile();
+		OutputStream cacheOutput = new FileOutputStream(cacheTemp);
+
+		// Write to both the cache file and web output
+		JSONUtils.toJSON(ret, new SplitterOutputStream(cacheOutput, resp.getOutputStream()), false);
+		// System.out.println("Writing cache file for " + start);
+		// JSONUtils.toJSON(ret, new JSONCompressorOutputStream(resp.getOutputStream()), false);
+		
+		//Move cache file
+		synchronized (this) {
+			cacheTemp.renameTo(cache);
+		}
+	}
+
+	private void writeHistoryData(List<HashMap<String, Object>> output, ExecutableFlow history) {
+		HashMap<String, Object> data = new HashMap<String, Object>();
+
+		data.put("scheduleid", history.getScheduleId());
+		Project project = projectManager.getProject(history.getProjectId());
+		data.put("flowname", history.getFlowId());
+		data.put("projectname", project.getName());
+		data.put("time", history.getStartTime());
+		data.put("period", "0");
+		long endTime = history.getEndTime();
+		if(endTime == -1){
+			endTime = System.currentTimeMillis();
+		}
+		data.put("length", endTime - history.getStartTime());
+		data.put("history", true);
+		data.put("status", history.getStatus().getNumVal());
 
 		output.add(data);
 	}
diff --git a/src/web/js/azkaban.schedule.svg.js b/src/web/js/azkaban.schedule.svg.js
index d272251..e9e583c 100644
--- a/src/web/js/azkaban.schedule.svg.js
+++ b/src/web/js/azkaban.schedule.svg.js
@@ -123,21 +123,26 @@ $(function() {
 					}
 				}
 
+				var gDayViewOuterGroup = svg.group(gMain);
+				var gDayView = svg.group(gDayViewOuterGroup, {transform: "translate(" + (border + timeWidth) + "," + header + ")"});
 				if(isThisWeek != -1)
 				{
 					var date = new Date(firstDay + dayMillisConst * isThisWeek);
 					var day = date.getDate();
 					var gDay = svg.group(gMain, {transform: "translate(" + (border + timeWidth + isThisWeek * dayWidth) + "," + header + ")"});
 					svg.rect(gDay, 0, -header, dayWidth, 24 * lineHeight + header, {fill : "none", stroke : "#06F"});
+					var lineY = Math.floor(today.getHours() * lineHeight + today.getMinutes() * lineHeight / 60);
+					svg.line(gDay, 0, lineY, dayWidth, lineY, {fill : "none", stroke : "#06F", strokeWidth : 4});
 				}
 
-				var gDayView = svg.group(gMain, {transform: "translate(" + (border + timeWidth) + "," + header + ")"});
 				//A list of all items
 				var itemByDay = new Array();
 				for(var deltaDay = 0; deltaDay < numDays; deltaDay++) {
 					itemByDay[deltaDay] = new Array();
 				}
 
+				var itemByScheduleIdMap = {};
+
 				function filterApplies(item) {
 					for(var i = 0; i < filterProject.length; i++) {
 						if(item.projectname == filterProject[i].projectname) {
@@ -156,7 +161,7 @@ $(function() {
 				function renderDays() {
 					//Clear items inside the day view
 					svg.remove(gDayView);
-					gDayView = svg.group(gMain, {transform: "translate(" + (border + timeWidth) + "," + header + ")"});
+					gDayView = svg.group(gDayViewOuterGroup, {transform: "translate(" + (border + timeWidth) + "," + header + ")"});
 
 					//Add day groups
 					for(var deltaDay = 0; deltaDay < numDays; deltaDay++) {
@@ -214,7 +219,11 @@ $(function() {
 								var startTime = new Date(item.time);
 								var startY = Math.floor(startTime.getHours() * lineHeight + startTime.getMinutes() * lineHeight / 60);
 								var endTime = new Date(item.time + item.length );
-								var endY = Math.floor(startY + (item.length * lineHeight) / hourMillisConst);
+								var endY = Math.ceil(startY + (item.length * lineHeight) / hourMillisConst);
+								var deltaY = Math.ceil(endY - startY);
+								if(deltaY < 5){
+									deltaY = 5;
+								}
 								//var anchor = svg.a(gColumn);
 								var itemUrl = contextURL + "/manager?project=" + item.projectname + "&flow=" + item.flowname;
 								var gItem = svg.link(gColumn, itemUrl, {transform: "translate(0," + startY + ")"});
@@ -231,7 +240,7 @@ $(function() {
 								gItem.addEventListener('mouseout', handleMouseOut);
 
 								//$(gItem).attr("style","color:red");
-								var rect = svg.rect(gItem, 0, 0, Math.ceil(dayWidth / columns.length), Math.floor(endY - startY), 0, 0, {fill : "#7E7", stroke : "#444", strokeWidth : 1});
+								var rect = svg.rect(gItem, 0, 0, Math.ceil(dayWidth / columns.length), deltaY, 0, 0, {fill : item.item.color, stroke : "#444", strokeWidth : 1});
 								
 								item.rect = rect;
 								//Draw text
@@ -241,25 +250,29 @@ $(function() {
 					}
 				}
 
-				function processItem(item)
+				function processItem(item, scheduled)
 				{
 					var firstTime = item.time;
 					var startTime = firstDay;
 					var endTime = firstDay + numDays * dayMillisConst;
 					var period = item.period;
+					var restrictedStartTime = Math.max(firstDay, today.valueOf());
+					if(!scheduled){
+						restrictedStartTime = firstDay;
+					}
 
 					// Shift time until we're past the start time
 					if (period > 0) {
 						// Calculate next execution time efficiently
 						// Take into account items that ends in the date specified, but does not start on that date
-						var periods = Math.floor((startTime - (firstTime + item.length)) / period);
+						var periods = Math.floor((restrictedStartTime - (firstTime)) / period);
 						//Make sure we don't subtract
 						if(periods < 0){
 							periods = 0;
 						}
 						firstTime += period * periods;
 						// Increment in case we haven't arrived yet. This will apply to most of the cases
-						while (firstTime + item.length < startTime) {
+						while (firstTime < restrictedStartTime) {
 							firstTime += period;
 						}
 					}
@@ -267,18 +280,18 @@ $(function() {
 					// Bad or no period
 					if (period <= 0) {
 						// Single instance case
-						if (firstTime >= startTime && firstTime < endTime) {
-							addItem({flowname : item.flowname, projectname: item.projectname, time: firstTime, length: item.length, item: item});
+						if (firstTime >= restrictedStartTime && firstTime < endTime) {
+							addItem({scheduleid: item.scheduleid, flowname : item.flowname, projectname: item.projectname, time: firstTime, length: item.length, item: item});
 						}
 					}
 					else {
 						if(period <= hourMillisConst) {
-							addItem({flowname : item.flowname, projectname: item.projectname, time: firstTime, length: endTime - firstTime, item: item});
+							addItem({scheduleid: item.scheduleid, flowname : item.flowname, projectname: item.projectname, time: firstTime, length: endTime - firstTime, item: item});
 						}
 						else{
 							// Repetitive schedule, firstTime is assumed to be after startTime
 							while (firstTime < endTime) {
-								addItem({flowname : item.flowname, projectname: item.projectname, time: firstTime, length: item.length, item: item});
+								addItem({scheduleid: item.scheduleid, flowname : item.flowname, projectname: item.projectname, time: firstTime, length: item.length, item: item});
 								firstTime += period;
 							}
 						}
@@ -305,7 +318,7 @@ $(function() {
 								tempLength = dayMillisConst;
 							}
 
-							var item2 = {time: nextMorning, length: tempLength, projectname: obj.projectname, flowname: obj.flowname, item: obj.item};
+							var item2 = {scheduleid: obj.scheduleid, time: nextMorning, length: tempLength, projectname: obj.projectname, flowname: obj.flowname, item: obj.item};
 							addItem(item2);
 							excess -= tempLength;
 							nextMorning += dayMillisConst;
@@ -318,7 +331,12 @@ $(function() {
 					{
 						//Add the item to the rendering list
 						itemByDay[index].push(obj);
-						obj.item.objs.push(obj);
+						//obj.item.objs.push(obj);
+
+						if(!itemByScheduleIdMap[obj.scheduleid]){
+							itemByScheduleIdMap[obj.scheduleid] = new Array();
+						}
+						itemByScheduleIdMap[obj.scheduleid].push(obj);
 					}
 				}
 
@@ -350,27 +368,54 @@ $(function() {
 						"\"" + obj.flowname + "\" from \"" + obj.projectname + "\"",
 						"Repeat: " + formatReadablePeriod(obj.item.period)
 					];
+
+					if(obj.item.period == 0){
+						text[1] = "";
+						if(obj.item.history == true) {
+							if(obj.item.status == 50){
+								text[1] = "SUCCEEDED";
+							}
+							else if(obj.item.status == 60){
+								text[1] = "KILLED";
+							}
+							else if(obj.item.status == 70){
+								text[1] = "FAILED";
+							}
+							else if(obj.item.status == 80){
+								text[1] = "FAILED_FINISHING";
+							}
+							else if(obj.item.status == 90){
+								text[1] = "SKIPPED";
+							}
+						}
+					}
 					var textLength = Math.max(measureText(svg, text[0], {fontSize: "13"}), measureText(svg, text[1], {fontSize: "13"}));
 					var rect = svg.rect(tooltip, 0, -40, textLength + 4, 40, {fill : "#FFF", stroke : "none"});
 					svg.text(tooltip, 2, -25, text[0], {fontSize: "13", fill : "#000", stroke : "none"});
 					svg.text(tooltip, 2, -5, text[1], {fontSize: "13", fill : "#000", stroke : "none"});
 
-					//Item highlight effect
-					for(var i = 0; i < obj.item.objs.length; i++) {
-						var obj2 = obj.item.objs[i];
-						$(obj2.rect).attr("fill", "#F00");
-					}
-
 					//Store tooltip
 					$(this).data("tooltip", tooltip);
+
+					if(itemByScheduleIdMap[obj.scheduleid]){
+						//Item highlight effect
+						var arry = itemByScheduleIdMap[obj.scheduleid];
+						for(var i = 0; i < arry.length; i++) {
+							$(arry[i].rect).attr("fill", "#FF0");
+						}
+					}
 				}
 
 				function handleMouseOut(event) {
 					//Item highlight effect
 					var obj = $(this).data("item");
-					for(var i = 0; i < obj.item.objs.length; i++) {
-						var obj2 = obj.item.objs[i];
-						$(obj2.rect).attr("fill", "#7E7");
+						//Item highlight effect
+					if(itemByScheduleIdMap[obj.scheduleid]){
+						var arry = itemByScheduleIdMap[obj.scheduleid];
+						for(var i = 0; i < arry.length; i++) {
+							var obj2 = obj.item.objs[i];
+							$(arry[i].rect).attr("fill", arry[i].item.color);
+						}
 					}
 					//Clear the fade interval
 					$($(this).data("tooltip")).fadeOut(250, function(){ svg.remove(this); });
@@ -387,19 +432,45 @@ $(function() {
 					{
 						var items = data.items;
 
+						console.log(data);
+
 						//Sort items by day
 						for(var i = 0; i < items.length; i++)
 						{
-							items[i].length = hourMillisConst; //TODO: parseInt(items[i].length);
-							items[i].time = parseInt(items[i].time);
-							items[i].period = parseInt(items[i].period);
+							//items[i].length = hourMillisConst; //TODO: Remove this to get the actual length
 							items[i].objs = new Array();
-							processItem(items[i]);
+							items[i].color = "#69F";
+							processItem(items[i], true);
 						}
 						//Trigger a re-rendering of all the data
 						renderDays();
 					}
 				});
+				for(var deltaDay = 0; deltaDay < numDays; deltaDay++) {
+					$.ajax({
+						type: "GET",
+						url: requestURL,
+						data: {"ajax": "loadHistory", "startTime": firstDay + deltaDay * dayMillisConst, "loadAll" : 0},
+						//dataType: "json",
+						success: function (data)
+						{
+							var items = data.items;
+
+							//Sort items by day
+							for(var i = 0; i < items.length; i++)
+							{
+								//if(items[i].length < 5 * 60 * 1000) items[i].length = 5 * 60 * 1000;
+								items[i].objs = new Array();
+								items[i].color = "#7E7";
+								if(items[i].status == 60 || items[i].status == 70 || items[i].status == 80)
+									items[i].color = "#E77";
+								processItem(items[i], false);
+							}
+							//Trigger a re-rendering of all the data
+							renderDays();
+						}
+					});
+				}
 			}
 		}, settings : {
 			"xmlns" : "http://www.w3.org/2000/svg", 
diff --git a/unit/java/azkaban/test/execapp/MockExecutorLoader.java b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
index 6510ae3..2b81f82 100644
--- a/unit/java/azkaban/test/execapp/MockExecutorLoader.java
+++ b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
@@ -11,6 +11,7 @@ import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutionReference;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
@@ -193,5 +194,11 @@ public class MockExecutorLoader implements ExecutorLoader {
 		return 0;
 	}
 
+	@Override
+	public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num, Status status) throws ExecutorManagerException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
 
 }
\ No newline at end of file