azkaban-uncached
Changes
src/java/azkaban/executor/ExecutorManager.java 157(+103 -54)
src/java/azkaban/webapp/servlet/HistoryServlet.java 108(+74 -34)
src/web/js/azkaban.historytimeline.view.js 35(+35 -0)
Details
src/java/azkaban/executor/ExecutorManager.java 157(+103 -54)
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 57dcf13..941a958 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -3,6 +3,7 @@ package azkaban.executor;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.File;
+import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
@@ -218,6 +219,62 @@ public class ExecutorManager {
return execFlows;
}
+ public List<ExecutionReference> getFlowHistory(long startTime, long endTime) {
+ ArrayList<ExecutionReference> references = new ArrayList<ExecutionReference>();
+
+ for (ExecutionReference ref: runningReference.values()) {
+ if (between(ref, startTime, endTime)) {
+ references.add(ref);
+ }
+ }
+
+ File archivePath = new File(basePath, ARCHIVE_DIR);
+ if (!archivePath.exists()) {
+ return references;
+ }
+
+
+ // This makes an assumption that there's no job that will run a day past the endTime
+ // nor a day before the endTime. We use 100000, because that's what the archive
+ // partitions by.
+ final long startThreshold = startTime - 100000;
+ final long endThreshold = endTime + 100000;
+
+ File[] archivePartitionsDir = archivePath.listFiles( new FileFilter() {
+ @Override
+ public boolean accept(File pathname) {
+ String name = pathname.getName();
+ long val = Long.valueOf(name);
+ return val >= startThreshold && val <= endThreshold;
+ }
+ }
+ );
+
+ for (File partition: archivePartitionsDir) {
+ File[] refFiles = partition.listFiles();
+ for (File refFile: refFiles) {
+ try {
+ ExecutionReference reference = ExecutionReference.readFromFile(refFile);
+ if (between(reference, startTime, endTime)) {
+ references.add(reference);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ return references;
+ }
+
+ private boolean between(ExecutionReference ref, long startTime, long endTime) {
+ long time = System.currentTimeMillis();
+ long refStart = ref.getStartTime() == -1 ? time : ref.getStartTime();
+ long refEnd = ref.getEndTime() == -1 ? time : ref.getEndTime();
+
+ return endTime > refStart && startTime <= refEnd;
+ }
+
public List<ExecutionReference> getFlowHistory(String regexPattern, int numResults, int skip) {
ArrayList<ExecutionReference> searchFlows = new ArrayList<ExecutionReference>();
@@ -229,12 +286,11 @@ public class ExecutorManager {
return searchFlows;
}
- for (ExecutableFlow flow: runningFlows.values()) {
+ for (ExecutionReference ref: runningReference.values()) {
if (skip > 0) {
skip--;
}
else {
- ExecutionReference ref = new ExecutionReference(flow);
if(pattern.matcher(ref.getFlowId()).find() ) {
searchFlows.add(ref);
}
@@ -244,7 +300,7 @@ public class ExecutorManager {
}
}
}
-
+
File archivePath = new File(basePath, ARCHIVE_DIR);
if (!archivePath.exists()) {
return searchFlows;
@@ -271,7 +327,11 @@ public class ExecutorManager {
}
else {
try {
- ExecutionReference ref = ExecutionReference.readFromDirectory(listArchivePartitions[i]);
+ ExecutionReference ref = ExecutionReference.readFromFile(listArchivePartitions[i]);
+ if (ref == null) {
+ continue;
+ }
+
if(pattern.matcher(ref.getFlowId()).find() ) {
searchFlows.add(ref);
}
@@ -298,9 +358,9 @@ public class ExecutorManager {
return;
}
- for (File activeFlowDir: activeFlowDirs) {
- if (activeFlowDir.isDirectory()) {
- ExecutionReference reference = ExecutionReference.readFromDirectory(activeFlowDir);
+ for (File activeFlowFile: activeFlowDirs) {
+ if (activeFlowFile.isFile()) {
+ ExecutionReference reference = ExecutionReference.readFromFile(activeFlowFile);
if (reference.getExecutorUrl() == null) {
reference.setExecutorPort(portNumber);
reference.setExecutorUrl(url);
@@ -318,7 +378,7 @@ public class ExecutorManager {
}
}
else {
- logger.info("Path " + activeFlowDir + " not a directory.");
+ logger.info("Path " + activeFlowFile + " not a directory.");
}
}
}
@@ -391,22 +451,22 @@ public class ExecutorManager {
// Check active
File baseActiveDir = new File(basePath, ACTIVE_DIR);
- File referenceDir = new File(baseActiveDir, executionId);
+ File referenceFile = new File(baseActiveDir, executionId + ".json");
- if (!referenceDir.exists()) {
+ if (!referenceFile.exists()) {
// Find the partition it would be in and search.
String partition = getExecutableReferencePartition(executionId);
File baseArchiveDir = new File(basePath, ARCHIVE_DIR + File.separator + partition);
- referenceDir = new File(baseArchiveDir, executionId);
- if (!referenceDir.exists()) {
- throw new ExecutorManagerException("Execution id '" + executionId + "' not found. Searching " + referenceDir);
+ referenceFile = new File(baseArchiveDir, executionId + ".json");
+ if (!referenceFile.exists()) {
+ throw new ExecutorManagerException("Execution id '" + executionId + "' not found. Searching " + referenceFile);
}
}
ExecutionReference reference = null;
try {
- reference = ExecutionReference.readFromDirectory(referenceDir);
+ reference = ExecutionReference.readFromFile(referenceFile);
} catch (IOException e) {
throw new ExecutorManagerException(e.getMessage(), e);
}
@@ -430,15 +490,14 @@ public class ExecutorManager {
}
// Create execution reference directory
- File referenceDir = new File(activeDirectory, flow.getExecutionId());
- referenceDir.mkdirs();
+ File referenceFile = new File(activeDirectory, flow.getExecutionId() + ".json");
// We don't really need to save the reference,
ExecutionReference reference = new ExecutionReference(flow);
reference.setExecutorUrl(url);
reference.setExecutorPort(portNumber);
try {
- reference.writeToDirectory(referenceDir);
+ reference.writeToFile(referenceFile);
} catch (IOException e) {
throw new ExecutorManagerException("Couldn't write execution to directory.", e);
}
@@ -913,22 +972,23 @@ public class ExecutorManager {
}
}
+ // This along with getExecutableArchivePartitionNumber must be kept consistant.
private String getExecutableReferencePartition(String execID) {
// We're partitioning the archive by the first part of the id, which should be a timestamp.
- // Then we're taking a substring of length - 6 to lop off the bottom 5 digits effectively partitioning
- // by 100000 millisec. We do this to have quicker searchs by pulling partitions, not full directories.
+ // Then we're taking a substring of length - 6 to lop off the bottom 5 digits effectively truncating
+ // by 1000000
int index = execID.indexOf('.');
- return execID.substring(0, index - 8);
+ return execID.substring(0, index - 8) + "00000000";
}
-
+
private void cleanExecutionReferenceJob(ExecutionReference reference) throws ExecutorManagerException {
// Write final file
String exId = reference.getExecId();
- String activeReferencePath = ACTIVE_DIR + File.separator + exId;
- File activeDirectory = new File(basePath, activeReferencePath);
- if (!activeDirectory.exists()) {
- logger.error("WTF!! Active reference " + activeDirectory + " directory doesn't exist.");
- throw new ExecutorManagerException("Active reference " + activeDirectory + " doesn't exists.");
+ String activeReferenceFile = ACTIVE_DIR + File.separator + exId + ".json";
+ File activeFile = new File(basePath, activeReferenceFile);
+ if (!activeFile.exists()) {
+ logger.error("WTF!! Active reference " + activeFile + " file doesn't exist.");
+ throw new ExecutorManagerException("Active reference " + activeFile + " doesn't exists.");
}
String partitionVal = getExecutableReferencePartition(exId);
@@ -939,36 +999,24 @@ public class ExecutorManager {
archivePartitionDir.mkdirs();
}
- File archiveDirectory = new File(archivePartitionDir, exId);
- if (archiveDirectory.exists()) {
- logger.error("Archive reference already exists. Cleaning up.");
+ String archiveJSONFile = exId + ".json";
+ File archiveFile = new File(archivePartitionDir, archiveJSONFile);
+ if (!archiveFile.exists()) {
+ logger.error("Archive reference doesn't exist. We're writing it.");
try {
- FileUtils.deleteDirectory(activeDirectory);
+ reference.writeToFile(archiveFile);
} catch (IOException e) {
- logger.error(e);
+ throw new ExecutorManagerException("Couldn't write execution to directory.", e);
}
-
- return;
}
-
- // Make new archive dir
- if (!archiveDirectory.mkdirs()) {
- throw new ExecutorManagerException("Cannot create " + archiveDirectory);
- }
-
+
try {
- reference.writeToDirectory(archiveDirectory);
+ reference.writeToFile(archiveFile);
} catch (IOException e) {
- throw new ExecutorManagerException("Couldn't write execution to directory.", e);
+ throw new ExecutorManagerException("Couldn't write execution to file " + archiveFile, e);
}
- // Move file.
- try {
- FileUtils.deleteDirectory(activeDirectory);
- } catch (IOException e) {
- throw new ExecutorManagerException("Cannot cleanup active directory " + activeDirectory);
- }
-
+ activeFile.delete();
runningReference.remove(exId);
}
@@ -1190,19 +1238,20 @@ public class ExecutorManager {
return obj;
}
- public void writeToDirectory(File directory) throws IOException {
- File file = new File(directory, "execution.json");
- if (!file.exists()) {
- JSONUtils.toJSON(this.toObject(), file);
+ public void writeToFile(File jsonFile) throws IOException {
+ if (!jsonFile.exists()) {
+ JSONUtils.toJSON(this.toObject(), jsonFile);
}
}
- public static ExecutionReference readFromDirectory(File directory) throws IOException {
- File file = new File(directory, "execution.json");
+ public static ExecutionReference readFromFile(File file) throws IOException {
if (!file.exists()) {
throw new IOException("Execution file execution.json does not exist.");
}
+ if (file.isDirectory()) {
+ return null;
+ }
@SuppressWarnings("unchecked")
HashMap<String, Object> obj = (HashMap<String, Object>)JSONUtils.parseJSONFromFile(file);
ExecutionReference reference = new ExecutionReference();
diff --git a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
index 0ac8945..727abc2 100644
--- a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
@@ -161,6 +161,24 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
return defaultVal;
}
+ public long getLongParam(HttpServletRequest request, String name) throws ServletException {
+ String p = getParam(request, name);
+ return Long.valueOf(p);
+ }
+
+ public long getLongParam(HttpServletRequest request, String name, long defaultVal) {
+ if (hasParam(request, name)) {
+ try {
+ return getLongParam(request, name);
+ } catch (Exception e) {
+ return defaultVal;
+ }
+ }
+
+ return defaultVal;
+ }
+
+
public Map<String, String> getParamGroup(HttpServletRequest request, String groupName) throws ServletException {
@SuppressWarnings("unchecked")
Enumeration<Object> enumerate = (Enumeration<Object>)request.getParameterNames();
src/java/azkaban/webapp/servlet/HistoryServlet.java 108(+74 -34)
diff --git a/src/java/azkaban/webapp/servlet/HistoryServlet.java b/src/java/azkaban/webapp/servlet/HistoryServlet.java
index 4ffb6f3..6969ece 100644
--- a/src/java/azkaban/webapp/servlet/HistoryServlet.java
+++ b/src/java/azkaban/webapp/servlet/HistoryServlet.java
@@ -1,6 +1,10 @@
package azkaban.webapp.servlet;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import javax.servlet.ServletConfig;
@@ -10,6 +14,7 @@ import javax.servlet.http.HttpServletResponse;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManager.ExecutionReference;
+import azkaban.utils.JSONUtils;
import azkaban.webapp.session.Session;
public class HistoryServlet extends LoginAbstractAzkabanServlet {
@@ -26,46 +31,17 @@ public class HistoryServlet extends LoginAbstractAzkabanServlet {
@Override
protected void handleGet(HttpServletRequest req, HttpServletResponse resp,
Session session) throws ServletException, IOException {
- Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/historypage.vm");
- int pageNum = getIntParam(req, "page", 1);
- int pageSize = getIntParam(req, "size", 16);
-
- if (pageNum < 0) {
- pageNum = 1;
- }
-
- List<ExecutionReference> history = executorManager.getFlowHistory(".*", pageSize, (pageNum - 1)*pageSize);
- page.add("flowHistory", history);
- page.add("size", pageSize);
- page.add("page", pageNum);
- if (pageNum == 1) {
- page.add("previous", new PageSelection(1, pageSize, true, false));
+ if (hasParam(req, "timeline")) {
+ handleHistoryTimelinePage(req, resp, session);
}
- page.add("next", new PageSelection(pageNum + 1, pageSize, false, false));
- // Now for the 5 other values.
- int pageStartValue = 1;
- if (pageNum > 3) {
- pageStartValue = pageNum - 2;
+ else {
+ handleHistoryPage(req, resp, session);
}
-
- page.add("page1", new PageSelection(pageStartValue, pageSize, false, pageStartValue == pageNum));
- pageStartValue++;
- page.add("page2", new PageSelection(pageStartValue, pageSize, false, pageStartValue == pageNum));
- pageStartValue++;
- page.add("page3", new PageSelection(pageStartValue, pageSize, false, pageStartValue == pageNum));
- pageStartValue++;
- page.add("page4", new PageSelection(pageStartValue, pageSize, false, pageStartValue == pageNum));
- pageStartValue++;
- page.add("page5", new PageSelection(pageStartValue, pageSize, false, pageStartValue == pageNum));
- pageStartValue++;
-
- page.render();
}
@Override
- protected void handlePost(HttpServletRequest req, HttpServletResponse resp,
- Session session) throws ServletException, IOException {
+ protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
if (hasParam(req, "action")) {
String action = getParam(req, "action");
if (action.equals("search")) {
@@ -118,6 +94,70 @@ public class HistoryServlet extends LoginAbstractAzkabanServlet {
}
+ private void handleHistoryPage(HttpServletRequest req, HttpServletResponse resp, Session session) {
+ Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/historypage.vm");
+ int pageNum = getIntParam(req, "page", 1);
+ int pageSize = getIntParam(req, "size", 16);
+
+ if (pageNum < 0) {
+ pageNum = 1;
+ }
+
+ List<ExecutionReference> history = executorManager.getFlowHistory(".*", pageSize, (pageNum - 1)*pageSize);
+ page.add("flowHistory", history);
+ page.add("size", pageSize);
+ page.add("page", pageNum);
+
+ if (pageNum == 1) {
+ page.add("previous", new PageSelection(1, pageSize, true, false));
+ }
+ page.add("next", new PageSelection(pageNum + 1, pageSize, false, false));
+ // Now for the 5 other values.
+ int pageStartValue = 1;
+ if (pageNum > 3) {
+ pageStartValue = pageNum - 2;
+ }
+
+ page.add("page1", new PageSelection(pageStartValue, pageSize, false, pageStartValue == pageNum));
+ pageStartValue++;
+ page.add("page2", new PageSelection(pageStartValue, pageSize, false, pageStartValue == pageNum));
+ pageStartValue++;
+ page.add("page3", new PageSelection(pageStartValue, pageSize, false, pageStartValue == pageNum));
+ pageStartValue++;
+ page.add("page4", new PageSelection(pageStartValue, pageSize, false, pageStartValue == pageNum));
+ pageStartValue++;
+ page.add("page5", new PageSelection(pageStartValue, pageSize, false, pageStartValue == pageNum));
+ pageStartValue++;
+
+ page.render();
+ }
+
+ private void handleHistoryTimelinePage(HttpServletRequest req, HttpServletResponse resp, Session session) {
+ Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/historytimelinepage.vm");
+ long currentTime = System.currentTimeMillis();
+ long begin = getLongParam(req, "begin", currentTime - 86400000);
+ long end = getLongParam(req, "end", currentTime);
+
+ page.add("begin", begin);
+ page.add("end", end);
+
+ List<ExecutionReference> refs = executorManager.getFlowHistory(begin, end);
+ ArrayList<Object> refList = new ArrayList<Object>();
+ for (ExecutionReference ref: refs) {
+
+ HashMap<String,Object> refObj = new HashMap<String,Object>();
+ refObj.put("execId", ref.getExecId());
+ refObj.put("start", ref.getStartTime());
+ refObj.put("end", ref.getEndTime());
+ refObj.put("status", ref.getStatus().toString());
+
+ refList.add(refObj);
+ }
+
+ page.add("data", JSONUtils.toJSON(refList));
+ page.render();
+ }
+
public class PageSelection {
private int page;
private int size;
diff --git a/src/java/azkaban/webapp/servlet/velocity/historytimelinepage.vm b/src/java/azkaban/webapp/servlet/velocity/historytimelinepage.vm
new file mode 100644
index 0000000..8b726a9
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/velocity/historytimelinepage.vm
@@ -0,0 +1,39 @@
+<!DOCTYPE html>
+<html>
+ <head>
+#parse( "azkaban/webapp/servlet/velocity/style.vm" )
+ <script type="text/javascript" src="${context}/js/jquery/jquery.js"></script>
+ <script type="text/javascript" src="${context}/js/namespace.js"></script>
+ <script type="text/javascript" src="${context}/js/underscore-1.2.1-min.js"></script>
+ <script type="text/javascript" src="${context}/js/backbone-0.5.3-min.js"></script>
+ <script type="text/javascript" src="${context}/js/jquery.simplemodal.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.main.view.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.historytimeline.view.js"></script>
+ <script type="text/javascript">
+ var contextURL = "${context}";
+ var currentTime = ${currentTime};
+ var timezone = "${timezone}";
+ var errorMessage = "${error_message}";
+ var successMessage = "${success_message}";
+
+ var begin = $begin;
+ var end = $end;
+ var data = $data;
+ </script>
+ </head>
+ <body>
+ #set($current_page="history")
+#parse( "azkaban/webapp/servlet/velocity/nav.vm" )
+ <div class="messaging"><p id="messageClose">X</p><p id="message"></p></div>
+
+ <div class="content">
+ <div id="all-jobs-content">
+ <div class="section-hd">
+ <h2>History Timeline</h2>
+ </div>
+ </div>
+
+ </div>
+ </body>
+</html>
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobpage.vm b/src/java/azkaban/webapp/servlet/velocity/jobpage.vm
index 9f47d4b..8c4cb0b 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jobpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jobpage.vm
@@ -42,7 +42,7 @@
<h4><a href="${context}/manager?project=${project.name}&flow=${flowid}">Flow <span>$flowid</span></a></h4>
</div>
- <a id="jobs-logs-btn" class="btn2" href="${context}/manager?project=${project.name}&job=$jobid&history">Project Logs</a>
+ <a id="jobs-logs-btn" class="btn2" href="${context}/manager?project=${project.name}&job=$jobid&history">Job History</a>
</div>
</div>
src/web/js/azkaban.historytimeline.view.js 35(+35 -0)
diff --git a/src/web/js/azkaban.historytimeline.view.js b/src/web/js/azkaban.historytimeline.view.js
new file mode 100644
index 0000000..95829bc
--- /dev/null
+++ b/src/web/js/azkaban.historytimeline.view.js
@@ -0,0 +1,35 @@
+$.namespace('azkaban');
+
+var timelineModel;
+azkaban.TimelineModel = Backbone.Model.extend({});
+
+azkaban.JobLogView = Backbone.View.extend({
+ events: {
+ },
+ initialize: function(settings) {
+ }
+});
+
+var showDialog = function(title, message) {
+ $('#messageTitle').text(title);
+
+ $('#messageBox').text(message);
+
+ $('#messageDialog').modal({
+ closeHTML: "<a href='#' title='Close' class='modal-close'>x</a>",
+ position: ["20%",],
+ containerId: 'confirm-container',
+ containerCss: {
+ 'height': '220px',
+ 'width': '565px'
+ },
+ onShow: function (dialog) {
+ }
+ });
+}
+
+
+$(function() {
+ timelineModel = new azkaban.TimelineModel();
+ jobLogView = new azkaban.JobLogView({el:$('#jobLogView'), model: timelineModel});
+});