azkaban-memoizeit

Details

diff --git a/src/java/azkaban/utils/SplitterOutputStream.java b/src/java/azkaban/utils/SplitterOutputStream.java
new file mode 100644
index 0000000..81821c6
--- /dev/null
+++ b/src/java/azkaban/utils/SplitterOutputStream.java
@@ -0,0 +1,53 @@
+package azkaban.utils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SplitterOutputStream extends OutputStream {
+	List<OutputStream> outputs;
+
+	public SplitterOutputStream(OutputStream... outputs) {
+		this.outputs = new ArrayList<OutputStream>(outputs.length);
+		for (OutputStream output : outputs) {
+			this.outputs.add(output);
+		}
+	}
+
+	@Override
+	public void write(int b) throws IOException {
+		for (OutputStream output : outputs) {
+			output.write(b);
+		}
+	}
+
+	@Override
+	public void write(byte[] b) throws IOException {
+		for (OutputStream output : outputs) {
+			output.write(b);
+		}
+	}
+
+	@Override
+	public void write(byte[] b, int off, int len) throws IOException {
+		for (OutputStream output : outputs) {
+			output.write(b, off, len);
+		}
+	}
+
+	@Override
+	public void flush() throws IOException {
+		for (OutputStream output : outputs) {
+			output.flush();
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		for (OutputStream output : outputs) {
+			output.close();
+		}
+	}
+
+}
diff --git a/src/java/azkaban/utils/Utils.java b/src/java/azkaban/utils/Utils.java
index fb4419f..03b1bab 100644
--- a/src/java/azkaban/utils/Utils.java
+++ b/src/java/azkaban/utils/Utils.java
@@ -277,4 +277,12 @@ public class Utils {
 		Method method = clazz.getDeclaredMethod(methodName, argTypes);
 		return method.invoke(null, args);
 	}
+	
+	public static void copyStream(InputStream input, OutputStream output) throws IOException {
+		byte[] buffer = new byte[1024];
+		int bytesRead;
+		while ((bytesRead = input.read(buffer)) != -1) {
+			output.write(buffer, 0, bytesRead);
+		}
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 1807af6..0092999 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -16,7 +16,12 @@
 
 package azkaban.webapp.servlet;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -36,25 +41,31 @@ import org.joda.time.Minutes;
 import org.joda.time.ReadablePeriod;
 import org.joda.time.format.DateTimeFormat;
 
+import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
 import azkaban.flow.Flow;
 import azkaban.flow.Node;
 import azkaban.project.Project;
-import azkaban.project.ProjectManager;
 import azkaban.project.ProjectLogEvent.EventType;
-import azkaban.user.Permission;
-import azkaban.user.User;
-import azkaban.user.Permission.Type;
-import azkaban.webapp.AzkabanWebServer;
-import azkaban.webapp.session.Session;
+import azkaban.project.ProjectManager;
 import azkaban.scheduler.Schedule;
 import azkaban.scheduler.ScheduleManager;
 import azkaban.scheduler.ScheduleManagerException;
 import azkaban.sla.SLA;
-import azkaban.sla.SLA.SlaRule;
 import azkaban.sla.SLA.SlaAction;
+import azkaban.sla.SLA.SlaRule;
 import azkaban.sla.SLA.SlaSetting;
 import azkaban.sla.SlaOptions;
+import azkaban.user.Permission;
+import azkaban.user.Permission.Type;
+import azkaban.user.User;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.SplitterOutputStream;
+import azkaban.utils.Utils;
+import azkaban.webapp.AzkabanWebServer;
+import azkaban.webapp.session.Session;
 
 public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 	private static final long serialVersionUID = 1L;
@@ -97,6 +108,10 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		else if(ajaxName.equals("loadFlow")) {
 			ajaxLoadFlows(req, ret, session.getUser());
 		}
+		else if(ajaxName.equals("loadHistory")) {
+			ajaxLoadHistory(req, resp, session.getUser());
+			ret = null;
+		}
 		else if(ajaxName.equals("scheduleFlow")) {
 			ajaxScheduleFlow(req, ret, session.getUser());
 		}
@@ -349,7 +364,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		if (schedules.size() <= 0)
 			return;
 
-		List<HashMap<String, String>> output = new ArrayList<HashMap<String, String>>();
+		List<HashMap<String, Object>> output = new ArrayList<HashMap<String, Object>>();
 		ret.put("items", output);
 
 		for (Schedule schedule : schedules) {
@@ -357,19 +372,118 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		}
 	}
 
-	private void writeScheduleData(List<HashMap<String, String>> output, Schedule schedule) {
-		HashMap<String, String> data = new HashMap<String, String>();
+	private void writeScheduleData(List<HashMap<String, Object>> output, Schedule schedule) {
+		HashMap<String, Object> data = new HashMap<String, Object>();
+		data.put("scheduleid", schedule.getScheduleId());
 		data.put("flowname", schedule.getFlowName());
 		data.put("projectname", schedule.getProjectName());
-		data.put("time", Long.toString(schedule.getFirstSchedTime()));
+		data.put("time", schedule.getFirstSchedTime());
 
 		DateTime time = DateTime.now();
 		long period = 0;
 		if(schedule.getPeriod() != null){
 			period = time.plus(schedule.getPeriod()).getMillis() - time.getMillis();
 		}
-		data.put("period", Long.toString(period));
-		data.put("length", Long.toString(2 * 3600 * 1000));
+		data.put("period", period);
+		data.put("length", 2 * 3600 * 1000);
+		data.put("history", false);
+
+		output.add(data);
+	}
+	
+	private void ajaxLoadHistory(HttpServletRequest req, HttpServletResponse resp, User user) throws ServletException, IOException {
+		resp.setContentType(JSON_MIME_TYPE);
+		long today = DateTime.now().withTime(0, 0, 0, 0).getMillis();
+		long startTime = getLongParam(req, "startTime");
+		DateTime start = new DateTime(startTime);
+		// Ensure start time is 12:00 AM
+		startTime = start.withTime(0, 0, 0, 0).getMillis();
+		boolean useCache = false;
+		if (startTime < today) {
+			useCache = true;
+		}
+		long endTime = startTime + 24 * 3600 * 1000;
+		// long endTime = getLongParam(req, "endTime");
+		int loadAll = getIntParam(req, "loadAll");
+
+		// Cache file
+		File cache = new File("cache/" + startTime + ".cache");
+		cache.getParentFile().mkdirs();
+
+		if (useCache) {
+			// Determine if cache exists
+			boolean cacheExists = false;
+			synchronized (this) {
+				cacheExists = cache.exists() && cache.isFile();
+			}
+			if (cacheExists) {
+				// Send the cache instead
+				InputStream cacheInput = new FileInputStream(cache);
+				Utils.copyStream(cacheInput, resp.getOutputStream());
+				// System.out.println("Using cache copy for " + start);
+				return;
+			}
+		}
+
+		// Load data if not cached
+		List<ExecutableFlow> history = null;
+		try {
+			AzkabanWebServer server = (AzkabanWebServer) getApplication();
+			ExecutorManager executorManager = server.getExecutorManager();
+			history = executorManager.getExecutableFlows(null, null, null, 0, startTime, endTime, -1, -1);
+		} catch (ExecutorManagerException e) {
+			// Return empty should suffice
+		}
+		HashMap<String, Object> ret = new HashMap<String, Object>();
+		List<HashMap<String, Object>> output = new ArrayList<HashMap<String, Object>>();
+		ret.put("items", output);
+		for (ExecutableFlow historyItem : history) {
+			// Check if it is an scheduled execution
+			if (historyItem.getScheduleId() >= 0 || loadAll != 0) {
+				writeHistoryData(output, historyItem);
+			}
+		}
+
+		// Make sure we're ready to cache it, otherwise output and return
+		synchronized (this) {
+			if (!useCache || cache.exists()) {
+				JSONUtils.toJSON(ret, resp.getOutputStream(), false);
+				return;
+			}
+		}
+		
+		//Create cache file
+		File cacheTemp = new File("cache/" + startTime + ".tmp");
+		cacheTemp.createNewFile();
+		OutputStream cacheOutput = new FileOutputStream(cacheTemp);
+
+		// Write to both the cache file and web output
+		JSONUtils.toJSON(ret, new SplitterOutputStream(cacheOutput, resp.getOutputStream()), false);
+		// System.out.println("Writing cache file for " + start);
+		// JSONUtils.toJSON(ret, new JSONCompressorOutputStream(resp.getOutputStream()), false);
+		
+		//Move cache file
+		synchronized (this) {
+			cacheTemp.renameTo(cache);
+		}
+	}
+
+	private void writeHistoryData(List<HashMap<String, Object>> output, ExecutableFlow history) {
+		HashMap<String, Object> data = new HashMap<String, Object>();
+
+		data.put("scheduleid", history.getScheduleId());
+		Project project = projectManager.getProject(history.getProjectId());
+		data.put("flowname", history.getFlowId());
+		data.put("projectname", project.getName());
+		data.put("time", history.getStartTime());
+		data.put("period", "0");
+		long endTime = history.getEndTime();
+		if(endTime == -1){
+			endTime = System.currentTimeMillis();
+		}
+		data.put("length", endTime - history.getStartTime());
+		data.put("history", true);
+		data.put("status", history.getStatus().getNumVal());
 
 		output.add(data);
 	}
diff --git a/src/web/js/azkaban.schedule.svg.js b/src/web/js/azkaban.schedule.svg.js
index d272251..5a840f9 100644
--- a/src/web/js/azkaban.schedule.svg.js
+++ b/src/web/js/azkaban.schedule.svg.js
@@ -123,21 +123,26 @@ $(function() {
 					}
 				}
 
+				var gDayViewOuterGroup = svg.group(gMain);
+				var gDayView = svg.group(gDayViewOuterGroup, {transform: "translate(" + (border + timeWidth) + "," + header + ")"});
 				if(isThisWeek != -1)
 				{
 					var date = new Date(firstDay + dayMillisConst * isThisWeek);
 					var day = date.getDate();
 					var gDay = svg.group(gMain, {transform: "translate(" + (border + timeWidth + isThisWeek * dayWidth) + "," + header + ")"});
 					svg.rect(gDay, 0, -header, dayWidth, 24 * lineHeight + header, {fill : "none", stroke : "#06F"});
+					var lineY = Math.floor(today.getHours() * lineHeight + today.getMinutes() * lineHeight / 60);
+					svg.line(gDay, 0, lineY, dayWidth, lineY, {fill : "none", stroke : "#06F", strokeWidth : 4});
 				}
 
-				var gDayView = svg.group(gMain, {transform: "translate(" + (border + timeWidth) + "," + header + ")"});
 				//A list of all items
 				var itemByDay = new Array();
 				for(var deltaDay = 0; deltaDay < numDays; deltaDay++) {
 					itemByDay[deltaDay] = new Array();
 				}
 
+				var itemByScheduleIdMap = {};
+
 				function filterApplies(item) {
 					for(var i = 0; i < filterProject.length; i++) {
 						if(item.projectname == filterProject[i].projectname) {
@@ -156,7 +161,7 @@ $(function() {
 				function renderDays() {
 					//Clear items inside the day view
 					svg.remove(gDayView);
-					gDayView = svg.group(gMain, {transform: "translate(" + (border + timeWidth) + "," + header + ")"});
+					gDayView = svg.group(gDayViewOuterGroup, {transform: "translate(" + (border + timeWidth) + "," + header + ")"});
 
 					//Add day groups
 					for(var deltaDay = 0; deltaDay < numDays; deltaDay++) {
@@ -214,7 +219,11 @@ $(function() {
 								var startTime = new Date(item.time);
 								var startY = Math.floor(startTime.getHours() * lineHeight + startTime.getMinutes() * lineHeight / 60);
 								var endTime = new Date(item.time + item.length );
-								var endY = Math.floor(startY + (item.length * lineHeight) / hourMillisConst);
+								var endY = Math.ceil(startY + (item.length * lineHeight) / hourMillisConst);
+								var deltaY = Math.ceil(endY - startY);
+								if(deltaY < 5){
+									deltaY = 5;
+								}
 								//var anchor = svg.a(gColumn);
 								var itemUrl = contextURL + "/manager?project=" + item.projectname + "&flow=" + item.flowname;
 								var gItem = svg.link(gColumn, itemUrl, {transform: "translate(0," + startY + ")"});
@@ -231,7 +240,7 @@ $(function() {
 								gItem.addEventListener('mouseout', handleMouseOut);
 
 								//$(gItem).attr("style","color:red");
-								var rect = svg.rect(gItem, 0, 0, Math.ceil(dayWidth / columns.length), Math.floor(endY - startY), 0, 0, {fill : "#7E7", stroke : "#444", strokeWidth : 1});
+								var rect = svg.rect(gItem, 0, 0, Math.ceil(dayWidth / columns.length), deltaY, 0, 0, {fill : item.item.color, stroke : "#444", strokeWidth : 1});
 								
 								item.rect = rect;
 								//Draw text
@@ -241,25 +250,29 @@ $(function() {
 					}
 				}
 
-				function processItem(item)
+				function processItem(item, scheduled)
 				{
 					var firstTime = item.time;
 					var startTime = firstDay;
 					var endTime = firstDay + numDays * dayMillisConst;
 					var period = item.period;
+					var restrictedStartTime = Math.max(firstDay, today.valueOf());
+					if(!scheduled){
+						restrictedStartTime = firstDay;
+					}
 
 					// Shift time until we're past the start time
 					if (period > 0) {
 						// Calculate next execution time efficiently
 						// Take into account items that ends in the date specified, but does not start on that date
-						var periods = Math.floor((startTime - (firstTime + item.length)) / period);
+						var periods = Math.floor((restrictedStartTime - (firstTime)) / period);
 						//Make sure we don't subtract
 						if(periods < 0){
 							periods = 0;
 						}
 						firstTime += period * periods;
 						// Increment in case we haven't arrived yet. This will apply to most of the cases
-						while (firstTime + item.length < startTime) {
+						while (firstTime < restrictedStartTime) {
 							firstTime += period;
 						}
 					}
@@ -267,18 +280,18 @@ $(function() {
 					// Bad or no period
 					if (period <= 0) {
 						// Single instance case
-						if (firstTime >= startTime && firstTime < endTime) {
-							addItem({flowname : item.flowname, projectname: item.projectname, time: firstTime, length: item.length, item: item});
+						if (firstTime >= restrictedStartTime && firstTime < endTime) {
+							addItem({scheduleid: item.scheduleid, flowname : item.flowname, projectname: item.projectname, time: firstTime, length: item.length, item: item});
 						}
 					}
 					else {
 						if(period <= hourMillisConst) {
-							addItem({flowname : item.flowname, projectname: item.projectname, time: firstTime, length: endTime - firstTime, item: item});
+							addItem({scheduleid: item.scheduleid, flowname : item.flowname, projectname: item.projectname, time: firstTime, length: endTime - firstTime, item: item});
 						}
 						else{
 							// Repetitive schedule, firstTime is assumed to be after startTime
 							while (firstTime < endTime) {
-								addItem({flowname : item.flowname, projectname: item.projectname, time: firstTime, length: item.length, item: item});
+								addItem({scheduleid: item.scheduleid, flowname : item.flowname, projectname: item.projectname, time: firstTime, length: item.length, item: item});
 								firstTime += period;
 							}
 						}
@@ -305,7 +318,7 @@ $(function() {
 								tempLength = dayMillisConst;
 							}
 
-							var item2 = {time: nextMorning, length: tempLength, projectname: obj.projectname, flowname: obj.flowname, item: obj.item};
+							var item2 = {scheduleid: obj.scheduleid, time: nextMorning, length: tempLength, projectname: obj.projectname, flowname: obj.flowname, item: obj.item};
 							addItem(item2);
 							excess -= tempLength;
 							nextMorning += dayMillisConst;
@@ -318,7 +331,12 @@ $(function() {
 					{
 						//Add the item to the rendering list
 						itemByDay[index].push(obj);
-						obj.item.objs.push(obj);
+						//obj.item.objs.push(obj);
+
+						if(!itemByScheduleIdMap[obj.scheduleid]){
+							itemByScheduleIdMap[obj.scheduleid] = new Array();
+						}
+						itemByScheduleIdMap[obj.scheduleid].push(obj);
 					}
 				}
 
@@ -350,27 +368,54 @@ $(function() {
 						"\"" + obj.flowname + "\" from \"" + obj.projectname + "\"",
 						"Repeat: " + formatReadablePeriod(obj.item.period)
 					];
+
+					if(obj.item.period == 0){
+						text[1] = "";
+						if(obj.item.history == true) {
+							if(obj.item.status == 50){
+								text[1] = "SUCCEEDED";
+							}
+							else if(obj.item.status == 60){
+								text[1] = "KILLED";
+							}
+							else if(obj.item.status == 70){
+								text[1] = "FAILED";
+							}
+							else if(obj.item.status == 80){
+								text[1] = "FAILED_FINISHING";
+							}
+							else if(obj.item.status == 90){
+								text[1] = "SKIPPED";
+							}
+						}
+					}
 					var textLength = Math.max(measureText(svg, text[0], {fontSize: "13"}), measureText(svg, text[1], {fontSize: "13"}));
 					var rect = svg.rect(tooltip, 0, -40, textLength + 4, 40, {fill : "#FFF", stroke : "none"});
 					svg.text(tooltip, 2, -25, text[0], {fontSize: "13", fill : "#000", stroke : "none"});
 					svg.text(tooltip, 2, -5, text[1], {fontSize: "13", fill : "#000", stroke : "none"});
 
-					//Item highlight effect
-					for(var i = 0; i < obj.item.objs.length; i++) {
-						var obj2 = obj.item.objs[i];
-						$(obj2.rect).attr("fill", "#F00");
-					}
-
 					//Store tooltip
 					$(this).data("tooltip", tooltip);
+
+					if(itemByScheduleIdMap[obj.scheduleid]){
+						//Item highlight effect
+						var arry = itemByScheduleIdMap[obj.scheduleid];
+						for(var i = 0; i < arry.length; i++) {
+							$(arry[i].rect).attr("fill", "#FF0");
+						}
+					}
 				}
 
 				function handleMouseOut(event) {
 					//Item highlight effect
 					var obj = $(this).data("item");
-					for(var i = 0; i < obj.item.objs.length; i++) {
-						var obj2 = obj.item.objs[i];
-						$(obj2.rect).attr("fill", "#7E7");
+						//Item highlight effect
+					if(itemByScheduleIdMap[obj.scheduleid]){
+						var arry = itemByScheduleIdMap[obj.scheduleid];
+						for(var i = 0; i < arry.length; i++) {
+							var obj2 = obj.item.objs[i];
+							$(arry[i].rect).attr("fill", arry[i].item.color);
+						}
 					}
 					//Clear the fade interval
 					$($(this).data("tooltip")).fadeOut(250, function(){ svg.remove(this); });
@@ -390,16 +435,40 @@ $(function() {
 						//Sort items by day
 						for(var i = 0; i < items.length; i++)
 						{
-							items[i].length = hourMillisConst; //TODO: parseInt(items[i].length);
-							items[i].time = parseInt(items[i].time);
-							items[i].period = parseInt(items[i].period);
+							items[i].length = hourMillisConst; //TODO: Remove this to get the actual length
 							items[i].objs = new Array();
-							processItem(items[i]);
+							items[i].color = "#69F";
+							processItem(items[i], true);
 						}
 						//Trigger a re-rendering of all the data
 						renderDays();
 					}
 				});
+				for(var deltaDay = 0; deltaDay < numDays; deltaDay++) {
+					$.ajax({
+						type: "GET",
+						url: requestURL,
+						data: {"ajax": "loadHistory", "startTime": firstDay + deltaDay * dayMillisConst, "loadAll" : 1},
+						//dataType: "json",
+						success: function (data)
+						{
+							var items = data.items;
+
+							//Sort items by day
+							for(var i = 0; i < items.length; i++)
+							{
+								//if(items[i].length < 5 * 60 * 1000) items[i].length = 5 * 60 * 1000;
+								items[i].objs = new Array();
+								items[i].color = "#7E7";
+								if(items[i].status == 60 || items[i].status == 70 || items[i].status == 80)
+									items[i].color = "#E77";
+								processItem(items[i], false);
+							}
+							//Trigger a re-rendering of all the data
+							renderDays();
+						}
+					});
+				}
 			}
 		}, settings : {
 			"xmlns" : "http://www.w3.org/2000/svg",