azkaban-uncached

Refactored the way we do ExecutionReferences. Seriously,

10/16/2012 5:23:25 AM

Details

diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 57dcf13..941a958 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -3,6 +3,7 @@ package azkaban.executor;
 import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileFilter;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.FileReader;
@@ -218,6 +219,62 @@ public class ExecutorManager {
 		return execFlows;
 	}
 
+	public List<ExecutionReference> getFlowHistory(long startTime, long endTime) {
+		ArrayList<ExecutionReference> references = new ArrayList<ExecutionReference>();
+		
+		for (ExecutionReference ref: runningReference.values()) {
+			if (between(ref, startTime, endTime)) {
+				references.add(ref);
+			}
+		}
+		
+		File archivePath = new File(basePath, ARCHIVE_DIR);
+		if (!archivePath.exists()) {
+			return references;
+		}
+
+		
+		// This makes an assumption that there's no job that will run a day past the endTime
+		// nor a day before the endTime. We use 100000, because that's what the archive
+		// partitions by.
+		final long startThreshold = startTime - 100000;
+		final long endThreshold = endTime + 100000;
+		
+		File[] archivePartitionsDir = archivePath.listFiles( new FileFilter() {
+			@Override
+			public boolean accept(File pathname) {
+				String name = pathname.getName();
+				long val = Long.valueOf(name);
+				return val >= startThreshold && val <= endThreshold;
+			}
+		}
+		);
+
+		for (File partition: archivePartitionsDir) {
+			File[] refFiles = partition.listFiles();
+			for (File refFile: refFiles) {
+				try {
+					ExecutionReference reference = ExecutionReference.readFromFile(refFile);
+					if (between(reference, startTime, endTime)) {
+						references.add(reference);
+					}
+				} catch (IOException e) {
+					e.printStackTrace();
+				}
+			}
+		}
+		
+		return references;
+	}
+	
+	private boolean between(ExecutionReference ref, long startTime, long endTime) {
+		long time = System.currentTimeMillis();
+		long refStart = ref.getStartTime() == -1 ? time : ref.getStartTime();
+		long refEnd = ref.getEndTime() == -1 ? time : ref.getEndTime();
+		
+		return endTime > refStart && startTime <= refEnd;
+	}
+	
 	public List<ExecutionReference> getFlowHistory(String regexPattern, int numResults, int skip) {
 		ArrayList<ExecutionReference> searchFlows = new ArrayList<ExecutionReference>();
 
@@ -229,12 +286,11 @@ public class ExecutorManager {
 			return searchFlows;
 		}
 		
-		for (ExecutableFlow flow: runningFlows.values()) {
+		for (ExecutionReference ref: runningReference.values()) {
 			if (skip > 0) {
 				skip--;
 			}
 			else {
-				ExecutionReference ref = new ExecutionReference(flow);
 				if(pattern.matcher(ref.getFlowId()).find() ) {
 					searchFlows.add(ref);
 				}
@@ -244,7 +300,7 @@ public class ExecutorManager {
 				}
 			}
 		}
-		
+
 		File archivePath = new File(basePath, ARCHIVE_DIR);
 		if (!archivePath.exists()) {
 			return searchFlows;
@@ -271,7 +327,11 @@ public class ExecutorManager {
 				}
 				else {
 					try {
-						ExecutionReference ref = ExecutionReference.readFromDirectory(listArchivePartitions[i]);
+						ExecutionReference ref = ExecutionReference.readFromFile(listArchivePartitions[i]);
+						if (ref == null) {
+							continue;
+						}
+
 						if(pattern.matcher(ref.getFlowId()).find() ) {
 							searchFlows.add(ref);
 						}
@@ -298,9 +358,9 @@ public class ExecutorManager {
 			return;
 		}
 		
-		for (File activeFlowDir: activeFlowDirs) {
-			if (activeFlowDir.isDirectory()) {
-				ExecutionReference reference = ExecutionReference.readFromDirectory(activeFlowDir);
+		for (File activeFlowFile: activeFlowDirs) {
+			if (activeFlowFile.isFile()) {
+				ExecutionReference reference = ExecutionReference.readFromFile(activeFlowFile);
 				if (reference.getExecutorUrl() == null) {
 					reference.setExecutorPort(portNumber);
 					reference.setExecutorUrl(url);
@@ -318,7 +378,7 @@ public class ExecutorManager {
 				}
 			}
 			else {
-				logger.info("Path " + activeFlowDir + " not a directory.");
+				logger.info("Path " + activeFlowFile + " not a directory.");
 			}
 		}
 	}
@@ -391,22 +451,22 @@ public class ExecutorManager {
 		
 		// Check active
 		File baseActiveDir = new File(basePath, ACTIVE_DIR);
-		File referenceDir = new File(baseActiveDir, executionId);
+		File referenceFile = new File(baseActiveDir, executionId + ".json");
 		
-		if (!referenceDir.exists()) {
+		if (!referenceFile.exists()) {
 			// Find the partition it would be in and search.
 			String partition = getExecutableReferencePartition(executionId);
 			
 			File baseArchiveDir = new File(basePath, ARCHIVE_DIR + File.separator + partition);
-			referenceDir = new File(baseArchiveDir, executionId);
-			if (!referenceDir.exists()) {
-				throw new ExecutorManagerException("Execution id '" + executionId + "' not found. Searching " + referenceDir);
+			referenceFile = new File(baseArchiveDir, executionId + ".json");
+			if (!referenceFile.exists()) {
+				throw new ExecutorManagerException("Execution id '" + executionId + "' not found. Searching " + referenceFile);
 			}
 		}
 
 		ExecutionReference reference = null;
 		try {
-			reference = ExecutionReference.readFromDirectory(referenceDir);
+			reference = ExecutionReference.readFromFile(referenceFile);
 		} catch (IOException e) {
 			throw new ExecutorManagerException(e.getMessage(), e);
 		}
@@ -430,15 +490,14 @@ public class ExecutorManager {
 		}
 
 		// Create execution reference directory
-		File referenceDir = new File(activeDirectory, flow.getExecutionId());
-		referenceDir.mkdirs();
+		File referenceFile = new File(activeDirectory, flow.getExecutionId() + ".json");
 
 		// We don't really need to save the reference, 
 		ExecutionReference reference = new ExecutionReference(flow);
 		reference.setExecutorUrl(url);
 		reference.setExecutorPort(portNumber);
 		try {
-			reference.writeToDirectory(referenceDir);
+			reference.writeToFile(referenceFile);
 		} catch (IOException e) {
 			throw new ExecutorManagerException("Couldn't write execution to directory.", e);
 		}
@@ -913,22 +972,23 @@ public class ExecutorManager {
 		}
 	}
 	
+	// This along with getExecutableArchivePartitionNumber must be kept consistant.
 	private String getExecutableReferencePartition(String execID) {
 		// We're partitioning the archive by the first part of the id, which should be a timestamp.
-		// Then we're taking a substring of length - 6 to lop off the bottom 5 digits effectively partitioning
-		// by 100000 millisec. We do this to have quicker searchs by pulling partitions, not full directories.
+		// Then we're taking a substring of length - 6 to lop off the bottom 5 digits effectively truncating
+		// by 1000000
 		int index = execID.indexOf('.');
-		return execID.substring(0, index - 8);
+		return execID.substring(0, index - 8) + "00000000";
 	}
-
+	
 	private void cleanExecutionReferenceJob(ExecutionReference reference) throws ExecutorManagerException {
 		// Write final file
 		String exId = reference.getExecId();
-		String activeReferencePath = ACTIVE_DIR + File.separator + exId; 
-		File activeDirectory = new File(basePath, activeReferencePath);
-		if (!activeDirectory.exists()) {
-			logger.error("WTF!! Active reference " + activeDirectory + " directory doesn't exist.");
-			throw new ExecutorManagerException("Active reference " + activeDirectory + " doesn't exists.");
+		String activeReferenceFile = ACTIVE_DIR + File.separator + exId + ".json"; 
+		File activeFile = new File(basePath, activeReferenceFile);
+		if (!activeFile.exists()) {
+			logger.error("WTF!! Active reference " + activeFile + " file doesn't exist.");
+			throw new ExecutorManagerException("Active reference " + activeFile + " doesn't exists.");
 		}
 		
 		String partitionVal = getExecutableReferencePartition(exId);
@@ -939,36 +999,24 @@ public class ExecutorManager {
 			archivePartitionDir.mkdirs();
 		}
 
-		File archiveDirectory = new File(archivePartitionDir, exId);
-		if (archiveDirectory.exists()) {
-			logger.error("Archive reference already exists. Cleaning up.");
+		String archiveJSONFile = exId + ".json";
+		File archiveFile = new File(archivePartitionDir, archiveJSONFile);
+		if (!archiveFile.exists()) {
+			logger.error("Archive reference doesn't exist. We're writing it.");
 			try {
-				FileUtils.deleteDirectory(activeDirectory);
+				reference.writeToFile(archiveFile);
 			} catch (IOException e) {
-				logger.error(e);
+				throw new ExecutorManagerException("Couldn't write execution to directory.", e);
 			}
-			
-			return;
 		}
-		
-		// Make new archive dir
-		if (!archiveDirectory.mkdirs()) {
-			throw new ExecutorManagerException("Cannot create " + archiveDirectory);
-		}
-		
+
 		try {
-			reference.writeToDirectory(archiveDirectory);
+			reference.writeToFile(archiveFile);
 		} catch (IOException e) {
-			throw new ExecutorManagerException("Couldn't write execution to directory.", e);
+			throw new ExecutorManagerException("Couldn't write execution to file " + archiveFile, e);
 		}
 		
-		// Move file.
-		try {
-			FileUtils.deleteDirectory(activeDirectory);
-		} catch (IOException e) {
-			throw new ExecutorManagerException("Cannot cleanup active directory " + activeDirectory);
-		}
-	
+		activeFile.delete();
 		runningReference.remove(exId);
 	}
 	
@@ -1190,19 +1238,20 @@ public class ExecutorManager {
 			return obj;
 		}
 		
-		public void writeToDirectory(File directory) throws IOException {
-			File file = new File(directory, "execution.json");
-			if (!file.exists()) {
-				JSONUtils.toJSON(this.toObject(), file);
+		public void writeToFile(File jsonFile) throws IOException {
+			if (!jsonFile.exists()) {
+				JSONUtils.toJSON(this.toObject(), jsonFile);
 			}
 		}
 		
-		public static ExecutionReference readFromDirectory(File directory) throws IOException {
-			File file = new File(directory, "execution.json");
+		public static ExecutionReference readFromFile(File file) throws IOException {
 			if (!file.exists()) {
 				throw new IOException("Execution file execution.json does not exist.");
 			}
 			
+			if (file.isDirectory()) {
+				return null;
+			}
 			@SuppressWarnings("unchecked")
 			HashMap<String, Object> obj = (HashMap<String, Object>)JSONUtils.parseJSONFromFile(file);
 			ExecutionReference reference = new ExecutionReference();
diff --git a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
index 0ac8945..727abc2 100644
--- a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
@@ -161,6 +161,24 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 		return defaultVal;
 	}
 
+	public long getLongParam(HttpServletRequest request, String name) throws ServletException {
+		String p = getParam(request, name);
+		return Long.valueOf(p);
+	}
+	
+	public long getLongParam(HttpServletRequest request, String name, long defaultVal) {
+		if (hasParam(request, name)) {
+			try {
+				return getLongParam(request, name);
+			} catch (Exception e) {
+				return defaultVal;
+			}
+		}
+		
+		return defaultVal;
+	}
+
+	
 	public Map<String, String> getParamGroup(HttpServletRequest request, String groupName)  throws ServletException {
 		@SuppressWarnings("unchecked")
 		Enumeration<Object> enumerate = (Enumeration<Object>)request.getParameterNames();
diff --git a/src/java/azkaban/webapp/servlet/HistoryServlet.java b/src/java/azkaban/webapp/servlet/HistoryServlet.java
index 4ffb6f3..6969ece 100644
--- a/src/java/azkaban/webapp/servlet/HistoryServlet.java
+++ b/src/java/azkaban/webapp/servlet/HistoryServlet.java
@@ -1,6 +1,10 @@
 package azkaban.webapp.servlet;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 
 import javax.servlet.ServletConfig;
@@ -10,6 +14,7 @@ import javax.servlet.http.HttpServletResponse;
 
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManager.ExecutionReference;
+import azkaban.utils.JSONUtils;
 import azkaban.webapp.session.Session;
 
 public class HistoryServlet extends LoginAbstractAzkabanServlet {
@@ -26,46 +31,17 @@ public class HistoryServlet extends LoginAbstractAzkabanServlet {
 	@Override
 	protected void handleGet(HttpServletRequest req, HttpServletResponse resp,
 			Session session) throws ServletException, IOException {
-		Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/historypage.vm");
-		int pageNum = getIntParam(req, "page", 1);
-		int pageSize = getIntParam(req, "size", 16);
-		
-		if (pageNum < 0) {
-			pageNum = 1;
-		}
-		
-		List<ExecutionReference> history = executorManager.getFlowHistory(".*", pageSize, (pageNum - 1)*pageSize);
-		page.add("flowHistory", history);
-		page.add("size", pageSize);
-		page.add("page", pageNum);
 		
-		if (pageNum == 1) {
-			page.add("previous", new PageSelection(1, pageSize, true, false));
+		if (hasParam(req, "timeline")) {
+			handleHistoryTimelinePage(req, resp, session);
 		}
-		page.add("next", new PageSelection(pageNum + 1, pageSize, false, false));
-		// Now for the 5 other values.
-		int pageStartValue = 1;
-		if (pageNum > 3) {
-			pageStartValue = pageNum - 2;
+		else {
+			handleHistoryPage(req, resp, session);
 		}
-		
-		page.add("page1", new PageSelection(pageStartValue, pageSize, false, pageStartValue == pageNum));
-		pageStartValue++;
-		page.add("page2", new PageSelection(pageStartValue, pageSize, false, pageStartValue == pageNum));
-		pageStartValue++;
-		page.add("page3", new PageSelection(pageStartValue, pageSize, false, pageStartValue == pageNum));
-		pageStartValue++;
-		page.add("page4", new PageSelection(pageStartValue, pageSize, false, pageStartValue == pageNum));
-		pageStartValue++;
-		page.add("page5", new PageSelection(pageStartValue, pageSize, false, pageStartValue == pageNum));
-		pageStartValue++;
-		
-		page.render();
 	}
 
 	@Override
-	protected void handlePost(HttpServletRequest req, HttpServletResponse resp,
-			Session session) throws ServletException, IOException {
+	protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
 		if (hasParam(req, "action")) {
 			String action = getParam(req, "action");
 			if (action.equals("search")) {
@@ -118,6 +94,70 @@ public class HistoryServlet extends LoginAbstractAzkabanServlet {
 		
 	}
 
+	private void handleHistoryPage(HttpServletRequest req, HttpServletResponse resp, Session session) {
+		Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/historypage.vm");
+		int pageNum = getIntParam(req, "page", 1);
+		int pageSize = getIntParam(req, "size", 16);
+		
+		if (pageNum < 0) {
+			pageNum = 1;
+		}
+		
+		List<ExecutionReference> history = executorManager.getFlowHistory(".*", pageSize, (pageNum - 1)*pageSize);
+		page.add("flowHistory", history);
+		page.add("size", pageSize);
+		page.add("page", pageNum);
+		
+		if (pageNum == 1) {
+			page.add("previous", new PageSelection(1, pageSize, true, false));
+		}
+		page.add("next", new PageSelection(pageNum + 1, pageSize, false, false));
+		// Now for the 5 other values.
+		int pageStartValue = 1;
+		if (pageNum > 3) {
+			pageStartValue = pageNum - 2;
+		}
+		
+		page.add("page1", new PageSelection(pageStartValue, pageSize, false, pageStartValue == pageNum));
+		pageStartValue++;
+		page.add("page2", new PageSelection(pageStartValue, pageSize, false, pageStartValue == pageNum));
+		pageStartValue++;
+		page.add("page3", new PageSelection(pageStartValue, pageSize, false, pageStartValue == pageNum));
+		pageStartValue++;
+		page.add("page4", new PageSelection(pageStartValue, pageSize, false, pageStartValue == pageNum));
+		pageStartValue++;
+		page.add("page5", new PageSelection(pageStartValue, pageSize, false, pageStartValue == pageNum));
+		pageStartValue++;
+		
+		page.render();
+	}
+	
+	private void handleHistoryTimelinePage(HttpServletRequest req, HttpServletResponse resp, Session session) {
+		Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/historytimelinepage.vm");
+		long currentTime = System.currentTimeMillis();
+		long begin = getLongParam(req, "begin", currentTime - 86400000);
+		long end = getLongParam(req, "end", currentTime);
+		
+		page.add("begin", begin);
+		page.add("end", end);
+		
+		List<ExecutionReference> refs = executorManager.getFlowHistory(begin, end);
+		ArrayList<Object> refList = new ArrayList<Object>();
+		for (ExecutionReference ref: refs) {
+			
+			HashMap<String,Object> refObj = new HashMap<String,Object>();
+			refObj.put("execId", ref.getExecId());
+			refObj.put("start", ref.getStartTime());
+			refObj.put("end", ref.getEndTime());
+			refObj.put("status", ref.getStatus().toString());
+			
+			refList.add(refObj);
+		}
+		
+		page.add("data", JSONUtils.toJSON(refList));
+		page.render();
+	}
+	
 	public class PageSelection {
 		private int page;
 		private int size;
diff --git a/src/java/azkaban/webapp/servlet/velocity/historytimelinepage.vm b/src/java/azkaban/webapp/servlet/velocity/historytimelinepage.vm
new file mode 100644
index 0000000..8b726a9
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/velocity/historytimelinepage.vm
@@ -0,0 +1,39 @@
+<!DOCTYPE html> 
+<html>
+	<head>
+#parse( "azkaban/webapp/servlet/velocity/style.vm" )
+		<script type="text/javascript" src="${context}/js/jquery/jquery.js"></script>    
+		<script type="text/javascript" src="${context}/js/namespace.js"></script>
+		<script type="text/javascript" src="${context}/js/underscore-1.2.1-min.js"></script>
+		<script type="text/javascript" src="${context}/js/backbone-0.5.3-min.js"></script>
+		<script type="text/javascript" src="${context}/js/jquery.simplemodal.js"></script>
+		<script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
+		<script type="text/javascript" src="${context}/js/azkaban.main.view.js"></script>
+		<script type="text/javascript" src="${context}/js/azkaban.historytimeline.view.js"></script>
+		<script type="text/javascript">
+			var contextURL = "${context}";
+			var currentTime = ${currentTime};
+			var timezone = "${timezone}";
+			var errorMessage = "${error_message}";
+			var successMessage = "${success_message}";
+			
+			var begin = $begin;
+			var end = $end;
+			var data = $data;
+		</script>
+	</head>
+	<body>
+		#set($current_page="history")
+#parse( "azkaban/webapp/servlet/velocity/nav.vm" )
+		<div class="messaging"><p id="messageClose">X</p><p id="message"></p></div>  
+
+		<div class="content">
+			<div id="all-jobs-content">
+				<div class="section-hd">
+					<h2>History Timeline</h2>
+				</div>
+			</div>
+
+		</div>
+	</body>
+</html>
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobpage.vm b/src/java/azkaban/webapp/servlet/velocity/jobpage.vm
index 9f47d4b..8c4cb0b 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jobpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jobpage.vm
@@ -42,7 +42,7 @@
 						<h4><a href="${context}/manager?project=${project.name}&flow=${flowid}">Flow <span>$flowid</span></a></h4>
 					</div>
 					
-					<a id="jobs-logs-btn" class="btn2" href="${context}/manager?project=${project.name}&job=$jobid&history">Project Logs</a>
+					<a id="jobs-logs-btn" class="btn2" href="${context}/manager?project=${project.name}&job=$jobid&history">Job History</a>
 				</div>
 			</div>
 			
diff --git a/src/web/js/azkaban.historytimeline.view.js b/src/web/js/azkaban.historytimeline.view.js
new file mode 100644
index 0000000..95829bc
--- /dev/null
+++ b/src/web/js/azkaban.historytimeline.view.js
@@ -0,0 +1,35 @@
+$.namespace('azkaban');
+
+var timelineModel;
+azkaban.TimelineModel = Backbone.Model.extend({});
+
+azkaban.JobLogView = Backbone.View.extend({
+	events: {
+	},
+	initialize: function(settings) {
+	}
+});
+
+var showDialog = function(title, message) {
+  $('#messageTitle').text(title);
+
+  $('#messageBox').text(message);
+
+  $('#messageDialog').modal({
+      closeHTML: "<a href='#' title='Close' class='modal-close'>x</a>",
+      position: ["20%",],
+      containerId: 'confirm-container',
+      containerCss: {
+        'height': '220px',
+        'width': '565px'
+      },
+      onShow: function (dialog) {
+      }
+    });
+}
+
+
+$(function() {
+	timelineModel = new azkaban.TimelineModel();
+	jobLogView = new azkaban.JobLogView({el:$('#jobLogView'), model: timelineModel});
+});