azkaban-aplcache
Changes
src/java/azkaban/executor/ExecutableFlow.java 199(+180 -19)
src/java/azkaban/executor/ExecutorManager.java 164(+158 -6)
src/java/azkaban/flow/Node.java 14(+7 -7)
src/web/css/azkaban.css 48(+48 -0)
src/web/js/azkaban.exflow.view.js 484(+484 -0)
src/web/js/azkaban.flow.view.js 228(+206 -22)
Details
src/java/azkaban/executor/ExecutableFlow.java 199(+180 -19)
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index a43833f..9e37355 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -19,11 +19,19 @@ public class ExecutableFlow {
private String projectId;
private String executionPath;
private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
- private HashMap<String, ExecutableNode> executableNodes;
+ private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();;
private ArrayList<String> startNodes = new ArrayList<String>();
+ private long submitTime = -1;
+ private long startTime = -1;
+ private long endTime = -1;
+
+ private Status flowStatus = Status.UNKNOWN;
+
+ private String submitUser;
+
public enum Status {
- FAILED, SUCCEEDED, RUNNING, WAITING, IGNORED, READY
+ FAILED, SUCCEEDED, RUNNING, WAITING, IGNORED, READY, UNKNOWN
}
public ExecutableFlow(String id, Flow flow) {
@@ -34,9 +42,14 @@ public class ExecutableFlow {
this.setFlow(flow);
}
+ public ExecutableFlow() {
+ }
+
+ public List<ExecutableNode> getExecutableNodes() {
+ return new ArrayList<ExecutableNode>(executableNodes.values());
+ }
+
private void setFlow(Flow flow) {
- executableNodes = new HashMap<String, ExecutableNode>();
-
for (Node node: flow.getNodes()) {
String id = node.getId();
ExecutableNode exNode = new ExecutableNode(node);
@@ -100,14 +113,51 @@ public class ExecutableFlow {
public void setExecutionPath(String executionPath) {
this.executionPath = executionPath;
}
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long time) {
+ this.startTime = time;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(long time) {
+ this.endTime = time;
+ }
+
+ public long getSubmitTime() {
+ return submitTime;
+ }
+
+ public void setSubmitTime(long time) {
+ this.submitTime = time;
+ }
+
+ public Status getStatus() {
+ return flowStatus;
+ }
+ public void setStatus(Status flowStatus) {
+ this.flowStatus = flowStatus;
+ }
+
public Map<String,Object> toObject() {
HashMap<String, Object> flowObj = new HashMap<String, Object>();
flowObj.put("type", "executableflow");
- flowObj.put("execution.id", executionId);
- flowObj.put("execution.path", executionPath);
- flowObj.put("flow.id", flowId);
- flowObj.put("project.id", projectId);
+ flowObj.put("executionId", executionId);
+ flowObj.put("executionPath", executionPath);
+ flowObj.put("flowId", flowId);
+ flowObj.put("projectId", projectId);
+ flowObj.put("submitTime", submitTime);
+ flowObj.put("startTime", startTime);
+ flowObj.put("endTime", endTime);
+ flowObj.put("status", flowStatus.toString());
+ flowObj.put("submitUser", submitUser);
ArrayList<Object> nodes = new ArrayList<Object>();
for (ExecutableNode node: executableNodes.values()) {
@@ -118,6 +168,60 @@ public class ExecutableFlow {
return flowObj;
}
+ @SuppressWarnings("unchecked")
+ public static ExecutableFlow createExecutableFlowFromObject(Object obj) {
+ ExecutableFlow exFlow = new ExecutableFlow();
+
+ HashMap<String, Object> flowObj = (HashMap<String,Object>)obj;
+ exFlow.executionId = (String)flowObj.get("executionId");
+ exFlow.executionPath = (String)flowObj.get("executionPath");
+ exFlow.flowId = (String)flowObj.get("flowId");
+ exFlow.projectId = (String)flowObj.get("projectId");
+ exFlow.submitTime = getLongFromObject(flowObj.get("submitTime"));
+ exFlow.startTime = getLongFromObject(flowObj.get("startTime"));
+ exFlow.endTime = getLongFromObject(flowObj.get("endTime"));
+ exFlow.flowStatus = Status.valueOf((String)flowObj.get("status"));
+ exFlow.submitUser = (String)flowObj.get("submitUser");
+
+ List<Object> nodes = (List<Object>)flowObj.get("nodes");
+ for (Object nodeObj: nodes) {
+ ExecutableNode node = ExecutableNode.createNodeFromObject(nodeObj);
+ exFlow.executableNodes.put(node.getId(), node);
+ }
+
+ return exFlow;
+ }
+
+ private static long getLongFromObject(Object obj) {
+ if (obj instanceof Integer) {
+ return Long.valueOf((Integer)obj);
+ }
+
+ return (Long)obj;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void updateExecutableFlowFromObject(Object obj) {
+ HashMap<String, Object> flowObj = (HashMap<String,Object>)obj;
+
+ submitTime = (Long)flowObj.get("submitTime");
+ startTime = (Long)flowObj.get("startTime");
+ endTime = (Long)flowObj.get("endTime");
+ flowStatus = Status.valueOf((String)flowObj.get("status"));
+
+ List<Object> nodes = (List<Object>)flowObj.get("nodes");
+ for (Object nodeObj: nodes) {
+ HashMap<String, Object> nodeHash= (HashMap<String, Object>)nodeObj;
+ String nodeId = (String)nodeHash.get("id");
+ ExecutableNode node = executableNodes.get(nodeId);
+ if (nodeId == null) {
+ throw new RuntimeException("Node " + nodeId + " doesn't exist in flow.");
+ }
+
+ node.updateNodeFromObject(nodeObj);
+ }
+ }
+
public Set<String> getSources() {
HashSet<String> set = new HashSet<String>();
for (ExecutableNode exNode: executableNodes.values()) {
@@ -130,15 +234,25 @@ public class ExecutableFlow {
return set;
}
- private static class ExecutableNode {
+ public String getSubmitUser() {
+ return submitUser;
+ }
+
+ public void setSubmitUser(String submitUser) {
+ this.submitUser = submitUser;
+ }
+
+ public static class ExecutableNode {
private String id;
+
private String type;
private String jobPropsSource;
private String inheritPropsSource;
- private Status status;
+ private Status status = Status.READY;
private long startTime = -1;
private long endTime = -1;
-
+ private int level = 0;
+
private Set<String> inNodes = new HashSet<String>();
private Set<String> outNodes = new HashSet<String>();
@@ -148,6 +262,19 @@ public class ExecutableFlow {
jobPropsSource = node.getJobSource();
inheritPropsSource = node.getPropsSource();
status = Status.READY;
+ level = node.getLevel();
+ }
+
+ private ExecutableNode() {
+
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
}
public void addInNode(String exNode) {
@@ -177,18 +304,48 @@ public class ExecutableFlow {
public Object toObject() {
HashMap<String, Object> objMap = new HashMap<String, Object>();
objMap.put("id", id);
- objMap.put("job.source", jobPropsSource);
- objMap.put("prop.source", inheritPropsSource);
- objMap.put("job.type", type);
+ objMap.put("jobSource", jobPropsSource);
+ objMap.put("propSource", inheritPropsSource);
+ objMap.put("jobType", type);
objMap.put("status", status.toString());
- objMap.put("in.nodes", inNodes);
- objMap.put("out.nodes", outNodes);
- objMap.put("start.time", startTime);
- objMap.put("end.time", endTime);
-
+ objMap.put("inNodes", inNodes);
+ objMap.put("outNodes", outNodes);
+ objMap.put("startTime", startTime);
+ objMap.put("endTime", endTime);
+ objMap.put("level", level);
return objMap;
}
+ @SuppressWarnings("unchecked")
+ public static ExecutableNode createNodeFromObject(Object obj) {
+ ExecutableNode exNode = new ExecutableNode();
+
+ HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
+ exNode.id = (String)objMap.get("id");
+ exNode.jobPropsSource = (String)objMap.get("jobSource");
+ exNode.inheritPropsSource = (String)objMap.get("propSource");
+ exNode.type = (String)objMap.get("jobType");
+ exNode.status = Status.valueOf((String)objMap.get("status"));
+
+ exNode.inNodes.addAll( (List<String>)objMap.get("inNodes") );
+ exNode.outNodes.addAll( (List<String>)objMap.get("outNodes") );
+
+ exNode.startTime = getLongFromObject(objMap.get("startTime"));
+ exNode.endTime = getLongFromObject(objMap.get("endTime"));
+ exNode.level = (Integer)objMap.get("level");
+
+ return exNode;
+ }
+
+ @SuppressWarnings("unused")
+ public void updateNodeFromObject(Object obj) {
+ HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
+ status = Status.valueOf((String)objMap.get("status"));
+
+ startTime = (Long)objMap.get("startTime");
+ endTime = (Long)objMap.get("endTime");
+ }
+
public long getStartTime() {
return startTime;
}
@@ -212,5 +369,9 @@ public class ExecutableFlow {
public String getPropsSource() {
return inheritPropsSource;
}
+
+ public int getLevel() {
+ return level;
+ }
}
}
src/java/azkaban/executor/ExecutorManager.java 164(+158 -6)
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 21cf40c..ed0de8d 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -2,14 +2,17 @@ package azkaban.executor;
import java.io.BufferedOutputStream;
import java.io.File;
+import java.io.FileFilter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
@@ -23,8 +26,8 @@ public class ExecutorManager {
private static String FLOW_PATH = "flows";
private static Logger logger = Logger.getLogger(ExecutorManager.class);
private File basePath;
-
- private AtomicLong counter = new AtomicLong();
+
+ private AtomicInteger counter = new AtomicInteger();
private String token;
private HashMap<String, ExecutableFlow> runningFlows = new HashMap<String, ExecutableFlow>();
@@ -43,6 +46,116 @@ public class ExecutorManager {
token = props.getString("executor.shared.token", "");
counter.set(0);
+ loadActiveExecutions();
+ }
+
+ public List<ExecutableFlow> getExecutableFlowByProject(String projectId, int from, int maxResults) {
+ File activeFlows = new File(basePath, projectId + File.separatorChar + "active");
+
+ if (!activeFlows.exists()) {
+ return Collections.emptyList();
+ }
+
+ File[] executionFiles = activeFlows.listFiles();
+ if (executionFiles.length == 0 || from >= executionFiles.length) {
+ return Collections.emptyList();
+ }
+
+ Arrays.sort(executionFiles);
+
+ ArrayList<ExecutableFlow> executionFlows = new ArrayList<ExecutableFlow>();
+
+ int index = (executionFiles.length - from - 1);
+ for (int count = 0; count < maxResults && index >= 0; ++count, --index) {
+ File exDir = executionFiles[index];
+ ExecutableFlow flow = loadExecutableFlowFromDir(exDir);
+
+ if (flow != null) {
+ executionFlows.add(flow);
+ }
+ else {
+ logger.info("Skipping loading " + exDir + ". Couldn't load execution.");
+ }
+ }
+
+ return executionFlows;
+ }
+
+ public int getExecutableFlowByProjectFlow(String projectId, String flowName, int from, int maxResults, List<ExecutableFlow> results) {
+ File activeFlows = new File(basePath, projectId + File.separatorChar + "active");
+
+ if (!activeFlows.exists()) {
+ return 0;
+ }
+
+ File[] executionFiles = activeFlows.listFiles(new SuffixFilter(flowName, false));
+ //File[] executionFiles = activeFlows.listFiles();
+ if (executionFiles.length == 0 || from >= executionFiles.length) {
+ return 0;
+ }
+ Arrays.sort(executionFiles);
+
+ int count = 0;
+ for (int index = executionFiles.length - from - 1; count < maxResults && index>=0; --index ) {
+ File exDir = executionFiles[index];
+ ExecutableFlow flow = loadExecutableFlowFromDir(exDir);
+
+ if (flow != null) {
+ results.add(flow);
+ count++;
+ }
+ else {
+ logger.info("Skipping loading " + exDir + ". Couldn't load execution.");
+ }
+ }
+
+ return executionFiles.length;
+ }
+
+ private ExecutableFlow loadExecutableFlowFromDir(File exDir) {
+ logger.info("Loading execution " + exDir.getName());
+ String exFlowName = exDir.getName();
+
+ String flowFileName = "_" + exFlowName + ".flow";
+ File[] exFlowFiles = exDir.listFiles(new PrefixFilter(flowFileName));
+ Arrays.sort(exFlowFiles);
+
+ if (exFlowFiles.length <= 0) {
+ logger.error("Execution flow " + exFlowName + " missing flow file.");
+ return null;
+ }
+ File lastExFlow = exFlowFiles[exFlowFiles.length-1];
+
+ Object exFlowObj = null;
+ try {
+ exFlowObj = JSONUtils.parseJSONFromFile(lastExFlow);
+ } catch (IOException e) {
+ logger.error("Error loading execution flow " + exFlowName + ". Problems parsing json file.");
+ return null;
+ }
+
+ ExecutableFlow flow = ExecutableFlow.createExecutableFlowFromObject(exFlowObj);
+ return flow;
+ }
+
+ private void loadActiveExecutions() {
+ File[] executingProjects = basePath.listFiles();
+ for (File project: executingProjects) {
+ File activeFlows = new File(project, "active");
+ if (!activeFlows.exists()) {
+ continue;
+ }
+
+ for (File exflow: activeFlows.listFiles()) {
+ logger.info("Loading execution " + exflow.getName());
+ ExecutableFlow flow = loadExecutableFlowFromDir(exflow);
+
+ if (flow != null) {
+ logger.info("Adding active execution flow " + flow.getExecutionId());
+ runningFlows.put(flow.getExecutionId(), flow);
+ }
+ }
+ }
}
public synchronized ExecutableFlow createExecutableFlow(Flow flow) {
@@ -54,8 +167,10 @@ public class ExecutorManager {
// Find execution
File executionDir;
String executionId;
+ int count = counter.getAndIncrement();
+ String countString = String.format("%05d", count);
do {
- executionId = String.valueOf(System.currentTimeMillis()) + "." + id;
+ executionId = String.valueOf(System.currentTimeMillis()) + "." + countString + "." + id;
executionDir = new File(projectExecutionDir, executionId);
}
while(executionDir.exists());
@@ -65,8 +180,8 @@ public class ExecutorManager {
}
public synchronized void setupExecutableFlow(ExecutableFlow exflow) throws ExecutorManagerException {
- String path = exflow.getExecutionId();
- String projectFlowDir = exflow.getProjectId() + File.separator + path;
+ String executionId = exflow.getExecutionId();
+ String projectFlowDir = exflow.getProjectId() + File.separator + "active" + File.separator + executionId;
File executionPath = new File(basePath, projectFlowDir);
if (executionPath.exists()) {
throw new ExecutorManagerException("Execution path " + executionPath + " exists. Probably a simultaneous execution.");
@@ -74,6 +189,13 @@ public class ExecutorManager {
executionPath.mkdirs();
exflow.setExecutionPath(executionPath.getPath());
+ runningFlows.put(executionId, exflow);
+ }
+
+ public synchronized ExecutableFlow getExecutableFlow(String flowId) throws ExecutorManagerException {
+ ExecutableFlow flow = runningFlows.get(flowId);
+
+ return flow;
}
public void executeFlow(ExecutableFlow flow) throws ExecutorManagerException {
@@ -275,4 +397,34 @@ public class ExecutorManager {
private String createUniqueId(String projectId, String flowId) {
return null;
}
+
+ private static class PrefixFilter implements FileFilter {
+ private String prefix;
+
+ public PrefixFilter(String prefix) {
+ this.prefix = prefix;
+ }
+
+ @Override
+ public boolean accept(File pathname) {
+ String name = pathname.getName();
+
+ return pathname.isFile() && !pathname.isHidden() && name.length() >= prefix.length() && name.startsWith(prefix);
+ }
+ }
+
+ private static class SuffixFilter implements FileFilter {
+ private String suffix;
+ private boolean filesOnly = false;
+
+ public SuffixFilter(String suffix, boolean filesOnly) {
+ this.suffix = suffix;
+ }
+
+ @Override
+ public boolean accept(File pathname) {
+ String name = pathname.getName();
+ return (pathname.isFile() || !filesOnly) && !pathname.isHidden() && name.length() >= suffix.length() && name.endsWith(suffix);
+ }
+ }
}
src/java/azkaban/flow/Node.java 14(+7 -7)
diff --git a/src/java/azkaban/flow/Node.java b/src/java/azkaban/flow/Node.java
index 2192468..43e1b51 100644
--- a/src/java/azkaban/flow/Node.java
+++ b/src/java/azkaban/flow/Node.java
@@ -93,13 +93,13 @@ public class Node {
String id = (String)mapObj.get("id");
Node node = new Node(id);
- String jobSource = (String)mapObj.get("job.source");
- String propSource = (String)mapObj.get("prop.source");
- String typeSource = (String)mapObj.get("job.type");
+ String jobSource = (String)mapObj.get("jobSource");
+ String propSource = (String)mapObj.get("propSource");
+ String jobType = (String)mapObj.get("jobType");
node.setJobSource(jobSource);
node.setPropsSource(propSource);
- node.setType(typeSource);
+ node.setType(jobType);
Integer expectedRuntime = (Integer)mapObj.get("expectedRuntime");
if (expectedRuntime != null) {
@@ -135,9 +135,9 @@ public class Node {
public Object toObject() {
HashMap<String, Object> objMap = new HashMap<String, Object>();
objMap.put("id", id);
- objMap.put("job.source", jobSource);
- objMap.put("prop.source", propsSource);
- objMap.put("job.type", type);
+ objMap.put("jobSource", jobSource);
+ objMap.put("propSource", propsSource);
+ objMap.put("jobType", type);
objMap.put("expectedRuntime", expectedRunTimeSec);
HashMap<String, Object> layoutInfo = new HashMap<String, Object>();
diff --git a/src/java/azkaban/project/Project.java b/src/java/azkaban/project/Project.java
index 5b81b33..5c6fd25 100644
--- a/src/java/azkaban/project/Project.java
+++ b/src/java/azkaban/project/Project.java
@@ -138,7 +138,7 @@ public class Project {
ArrayList<Map<String, Object>> users = new ArrayList<Map<String, Object>>();
for (Map.Entry<String, Permission> entry : userToPermission.entrySet()) {
HashMap<String, Object> userMap = new HashMap<String, Object>();
- userMap.put("userid", entry.getKey());
+ userMap.put("userId", entry.getKey());
userMap.put("permissions", entry.getValue().toStringArray());
users.add(userMap);
}
@@ -171,7 +171,7 @@ public class Project {
.get("users");
for (Map<String, Object> user : users) {
- String userid = (String) user.get("userid");
+ String userid = (String) user.get("userId");
Permission perm = new Permission();
List<String> list = (List<String>) user.get("permissions");
perm.addPermissionsByName(list);
diff --git a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
index 9463f12..038ee96 100644
--- a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
@@ -2,6 +2,8 @@ package azkaban.webapp.servlet;
import java.io.File;
import java.io.IOException;
+import java.security.AccessControlException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@@ -11,10 +13,12 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlow.ExecutableNode;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
+import azkaban.flow.Node;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
import azkaban.project.ProjectManagerException;
@@ -42,8 +46,50 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
if (hasParam(req, "ajax")) {
handleAJAXAction(req, resp, session);
}
+ else if (hasParam(req, "execid")) {
+ handleExecutionFlowPage(req, resp, session);
+ }
}
+ private void handleExecutionFlowPage(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+ Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/executingflowpage.vm");
+ User user = session.getUser();
+ String execId = getParam(req, "execid");
+ page.add("execid", execId);
+
+ ExecutableFlow flow = null;
+ try {
+ flow = executorManager.getExecutableFlow(execId);
+ if (flow == null) {
+ page.add("errorMsg", "Error loading executing flow " + execId + " not found.");
+ page.render();
+ return;
+ }
+ } catch (ExecutorManagerException e) {
+ page.add("errorMsg", "Error loading executing flow: " + e.getMessage());
+ page.render();
+ return;
+ }
+
+ String projectId = flow.getProjectId();
+ Project project = null;
+ try {
+ project = projectManager.getProject(flow.getProjectId(), user);
+ } catch (AccessControlException e) {
+ page.add("errorMsg", "Do not have permission to view '" + flow.getExecutionId() + "'.");
+ page.render();
+ }
+
+ if (project == null) {
+ page.add("errorMsg", "Project " + projectId + " not found.");
+ }
+
+ page.add("projectName", projectId);
+ page.add("flowid", flow.getFlowId());
+
+ page.render();
+ }
+
@Override
protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
if (hasParam(req, "ajax")) {
@@ -52,19 +98,77 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
}
private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
- String projectName = getParam(req, "project");
-
HashMap<String, Object> ret = new HashMap<String, Object>();
- ret.put("project", projectName);
-
String ajaxName = getParam(req, "ajax");
- if (ajaxName.equals("executeFlow")) {
- ajaxExecuteFlow(req, resp, ret, session.getUser());
- }
+ if (hasParam(req, "execid")) {
+ if (ajaxName.equals("fetchexecflow")) {
+ ajaxFetchExecutableFlow(req, resp, ret, session.getUser());
+ }
+ }
+ else {
+ String projectName = getParam(req, "project");
+
+ ret.put("project", projectName);
+ if (ajaxName.equals("executeFlow")) {
+ ajaxExecuteFlow(req, resp, ret, session.getUser());
+ }
+ }
this.writeJSON(resp, ret);
}
+ private void ajaxFetchExecutableFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user) throws ServletException {
+ String execid = getParam(req, "execid");
+ System.out.println("Fetching " + execid);
+ ExecutableFlow exFlow = null;
+ try {
+ exFlow = executorManager.getExecutableFlow(execid);
+ } catch (ExecutorManagerException e) {
+ ret.put("error", "Error fetching execution '" + execid + "': " + e.getMessage());
+ }
+ if (exFlow == null) {
+ ret.put("error", "Cannot find execution '" + execid + "'");
+ return;
+ }
+
+ Project project = null;
+ try {
+ project = projectManager.getProject(exFlow.getProjectId(), user);
+ }
+ catch (AccessControlException e) {
+ ret.put("error", "Permission denied. User " + user.getUserId() + " doesn't have permissions to view project " + project.getName());
+ return;
+ }
+
+ ArrayList<Map<String, Object>> nodeList = new ArrayList<Map<String, Object>>();
+ ArrayList<Map<String, Object>> edgeList = new ArrayList<Map<String,Object>>();
+ for (ExecutableNode node : exFlow.getExecutableNodes()) {
+ HashMap<String, Object> nodeObj = new HashMap<String,Object>();
+ nodeObj.put("id", node.getId());
+ nodeObj.put("level", node.getLevel());
+ nodeObj.put("status", node.getStatus());
+ nodeObj.put("startTime", node.getStartTime());
+ nodeObj.put("endTime", node.getEndTime());
+
+ nodeList.add(nodeObj);
+
+ // Add edges
+ for (String out: node.getOutNodes()) {
+ HashMap<String, Object> edgeObj = new HashMap<String,Object>();
+ edgeObj.put("from", node.getId());
+ edgeObj.put("target", out);
+ edgeList.add(edgeObj);
+ }
+ }
+
+ ret.put("nodes", nodeList);
+ ret.put("edges", edgeList);
+ ret.put("startTime", exFlow.getStartTime());
+ ret.put("endTime", exFlow.getEndTime());
+ ret.put("submitTime", exFlow.getSubmitTime());
+ ret.put("submitUser", exFlow.getSubmitUser());
+ }
+
private void ajaxExecuteFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user) throws ServletException {
String projectId = getParam(req, "project");
String flowId = getParam(req, "flow");
@@ -99,6 +203,7 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
// Create ExecutableFlow
ExecutableFlow exflow = executorManager.createExecutableFlow(flow);
+ exflow.setSubmitUser(user.getUserId());
Map<String, String> paramGroup = this.getParamGroup(req, "disabled");
for (Map.Entry<String, String> entry: paramGroup.entrySet()) {
boolean nodeDisabled = Boolean.parseBoolean(entry.getValue());
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index c2cc965..c6affe7 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -30,6 +30,8 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManager;
import azkaban.flow.Edge;
import azkaban.flow.Flow;
import azkaban.flow.Node;
@@ -52,7 +54,8 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
private static final int DEFAULT_UPLOAD_DISK_SPOOL_SIZE = 20 * 1024 * 1024;
private static final NodeLevelComparator NODE_LEVEL_COMPARATOR = new NodeLevelComparator();
- private ProjectManager manager;
+ private ProjectManager projectManager;
+ private ExecutorManager executorManager;
private MultipartParser multipartParser;
private File tempDir;
private static Comparator<Flow> FLOW_ID_COMPARATOR = new Comparator<Flow>() {
@@ -65,7 +68,9 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
- manager = this.getApplication().getProjectManager();
+ projectManager = this.getApplication().getProjectManager();
+ executorManager = this.getApplication().getExecutorManager();
+
tempDir = this.getApplication().getTempDirectory();
multipartParser = new MultipartParser(DEFAULT_UPLOAD_DISK_SPOOL_SIZE);
}
@@ -125,7 +130,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
Project project = null;
try {
- project = manager.getProject(projectName, user);
+ project = projectManager.getProject(projectName, user);
} catch (Exception e) {
ret.put("error", e.getMessage());
this.writeJSON(resp, ret);
@@ -140,17 +145,17 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
}
else if (ajaxName.equals("fetchflowgraph")) {
if (handleAjaxPermission(project, user, Type.READ, ret)) {
- ajaxFetchFlowGraph(project, ret, req, resp);
+ ajaxFetchFlowGraph(project, ret, req);
}
}
else if (ajaxName.equals("fetchprojectflows")) {
if (handleAjaxPermission(project, user, Type.READ, ret)) {
- ajaxFetchProjectFlows(project, ret, req, resp);
+ ajaxFetchProjectFlows(project, ret, req);
}
}
else if (ajaxName.equals("changeDescription")) {
if (handleAjaxPermission(project, user, Type.WRITE, ret)) {
- ajaxChangeDescription(project, ret, req, resp);
+ ajaxChangeDescription(project, ret, req);
}
}
else if (ajaxName.equals("getPermissions")) {
@@ -168,6 +173,11 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
ajaxAddUserPermission(project, ret, req);
}
}
+ else if (ajaxName.equals("fetchFlowExecutions")) {
+ if (handleAjaxPermission(project, user, Type.READ, ret)) {
+ ajaxFetchFlowExecutions(project, ret, req);
+ }
+ }
this.writeJSON(resp, ret);
}
@@ -181,18 +191,49 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
return false;
}
- private void ajaxChangeDescription(Project project, HashMap<String, Object> ret, HttpServletRequest req, HttpServletResponse resp) throws ServletException {
+ private void ajaxFetchFlowExecutions(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
+ String flowId = getParam(req, "flow");
+ int from = Integer.valueOf(getParam(req, "start"));
+ int length = Integer.valueOf(getParam(req, "length"));
+
+ ArrayList<ExecutableFlow> exFlows = new ArrayList<ExecutableFlow>();
+ int total = executorManager.getExecutableFlowByProjectFlow(project.getName(), flowId, from, length, exFlows);
+
+ ret.put("flow", flowId);
+ ret.put("total", total);
+ ret.put("from", from);
+ ret.put("length", length);
+
+ ArrayList<Object> history = new ArrayList<Object>();
+ for (ExecutableFlow flow: exFlows) {
+ HashMap<String, Object> flowInfo = new HashMap<String, Object>();
+ flowInfo.put("execId", flow.getExecutionId());
+ flowInfo.put("flowId", flow.getFlowId());
+ flowInfo.put("projectId", flow.getProjectId());
+ flowInfo.put("status", flow.getStatus().toString());
+ flowInfo.put("submitTime", flow.getSubmitTime());
+ flowInfo.put("startTime", flow.getStartTime());
+ flowInfo.put("endTime", flow.getEndTime());
+ flowInfo.put("submitUser", flow.getSubmitUser());
+
+ history.add(flowInfo);
+ }
+
+ ret.put("executions", history);
+ }
+
+ private void ajaxChangeDescription(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
String description = getParam(req, "description");
project.setDescription(description);
try {
- manager.commitProject(project.getName());
+ projectManager.commitProject(project.getName());
} catch (ProjectManagerException e) {
ret.put("error", e.getMessage());
}
}
- private void ajaxFetchProjectFlows(Project project, HashMap<String, Object> ret, HttpServletRequest req, HttpServletResponse resp) throws ServletException {
+ private void ajaxFetchProjectFlows(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
ArrayList<Map<String,Object>> flowList = new ArrayList<Map<String,Object>>();
for (Flow flow: project.getFlows()) {
HashMap<String, Object> flowObj = new HashMap<String, Object>();
@@ -203,7 +244,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
ret.put("flows", flowList);
}
- private void ajaxFetchFlowGraph(Project project, HashMap<String, Object> ret, HttpServletRequest req, HttpServletResponse resp) throws ServletException {
+ private void ajaxFetchFlowGraph(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
String flowId = getParam(req, "flow");
Flow flow = project.getFlow(flowId);
@@ -212,8 +253,6 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
for (Node node: flow.getNodes()) {
HashMap<String, Object> nodeObj = new HashMap<String,Object>();
nodeObj.put("id", node.getId());
- nodeObj.put("x", node.getPosition().getX());
- nodeObj.put("y", node.getPosition().getY());
nodeObj.put("level", node.getLevel());
nodeList.add(nodeObj);
@@ -228,20 +267,20 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
if (edge.hasError()) {
edgeObj.put("error", edge.getError());
}
- if (edge.getGuideValues() != null) {
- List<Point2D> guides = edge.getGuideValues();
- ArrayList<Object> guideOutput = new ArrayList<Object>();
- for (Point2D guide: guides) {
- double x = guide.getX();
- double y = guide.getY();
- HashMap<String, Double> point = new HashMap<String, Double>();
- point.put("x", x);
- point.put("y", y);
- guideOutput.add(point);
- }
-
- edgeObj.put("guides", guideOutput);
- }
+// if (edge.getGuideValues() != null) {
+// List<Point2D> guides = edge.getGuideValues();
+// ArrayList<Object> guideOutput = new ArrayList<Object>();
+// for (Point2D guide: guides) {
+// double x = guide.getX();
+// double y = guide.getY();
+// HashMap<String, Double> point = new HashMap<String, Double>();
+// point.put("x", x);
+// point.put("y", y);
+// guideOutput.add(point);
+// }
+//
+// edgeObj.put("guides", guideOutput);
+// }
edgeList.add(edgeObj);
}
@@ -320,7 +359,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
project.setUserPermission(username, perm);
try {
- manager.commitProject(project.getName());
+ projectManager.commitProject(project.getName());
} catch (ProjectManagerException e) {
ret.put("error", e.getMessage());
}
@@ -351,7 +390,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
perm.setPermission(Type.SCHEDULE, schedule);
}
try {
- manager.commitProject(project.getName());
+ projectManager.commitProject(project.getName());
} catch (ProjectManagerException e) {
ret.put("error", e.getMessage());
}
@@ -378,7 +417,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
Project project = null;
try {
- project = manager.getProject(projectName, user);
+ project = projectManager.getProject(projectName, user);
if (project == null) {
page.add("errorMsg", "Project " + projectName + " not found.");
}
@@ -411,7 +450,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
Project project = null;
Flow flow = null;
try {
- project = manager.getProject(projectName, user);
+ project = projectManager.getProject(projectName, user);
if (project == null) {
page.add("errorMsg", "Project " + projectName + " not found.");
}
@@ -431,7 +470,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
page.add("errorMsg", "Job " + jobName + " not found.");
}
else {
- Props prop = manager.getProperties(projectName, node.getJobSource(), user);
+ Props prop = projectManager.getProperties(projectName, node.getJobSource(), user);
page.add("jobid", node.getId());
page.add("jobtype", node.getType());
@@ -491,7 +530,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
Project project = null;
Flow flow = null;
try {
- project = manager.getProject(projectName, user);
+ project = projectManager.getProject(projectName, user);
if (project == null) {
page.add("errorMsg", "Project " + projectName + " not found.");
}
@@ -520,7 +559,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
User user = session.getUser();
Project project = null;
try {
- project = manager.getProject(projectName, user);
+ project = projectManager.getProject(projectName, user);
if (project == null) {
page.add("errorMsg", "Project " + projectName + " not found.");
}
@@ -558,7 +597,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
String message = null;
HashMap<String, Object> params = null;
try {
- manager.createProject(projectName, projectDescription, user);
+ projectManager.createProject(projectName, projectDescription, user);
status = "success";
action = "redirect";
String redirect = "manager?project=" + projectName;
@@ -595,7 +634,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
else {
try {
projectDir = extractFile(item);
- manager.uploadProject(projectName, projectDir, user, force);
+ projectManager.uploadProject(projectName, projectDir, user, force);
setSuccessMessageInCookie(resp, "Project Uploaded");
}
catch (Exception e) {
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
index e880ce6..6f66fb4 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -11,7 +11,7 @@
<script type="text/javascript" src="${context}/js/jquery.contextMenu.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.layout.js"></script>
- <script type="text/javascript" src="${context}/js/azkaban.flow.view.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.exflow.view.js"></script>
<script type="text/javascript" src="${context}/js/svgNavigate.js"></script>
<script type="text/javascript">
var contextURL = "${context}";
@@ -20,8 +20,9 @@
var errorMessage = "${error_message}";
var successMessage = "${success_message}";
- var projectName = "${project.name}";
+ var projectName = "${projectName}";
var flowName = "${flowid}";
+ var execId = "${execid}";
</script>
<link rel="stylesheet" type="text/css" href="${context}/css/jquery.contextMenu.custom.css" />
</head>
@@ -41,9 +42,11 @@
<div id="all-jobs-content">
<div class="section-hd">
- <h2><a href="${context}/manager?project=${project.name}&flow=${flowid}">Flow <span>$flowid</span></a></h2>
+ <h2><a href="${context}/executor?execid=${execid}">Execution <span>$execid</span></a></h2>
<div class="section-sub-hd">
- <h4><a href="${context}/manager?project=${project.name}">Project <span>$project.name</span></a></h4>
+ <h4><a href="${context}/manager?project=${projectName}">Project <span>$projectName</span></a></h4>
+ <h4 class="separator">></h4>
+ <h4><a href="${context}/manager?project=${projectName}&flow=${flowid}">Flow <span>$flowid</span></a></h4>
</div>
</div>
@@ -78,11 +81,6 @@
<ul id="jobMenu" class="contextMenu">
<li class="open"><a href="#open">Open...</a></li>
<li class="openwindow"><a href="#openwindow">Open in New Window...</a></li>
- <li class="disable"><a href="#disable">Disable</a></li>
- <li class="disableancestors"><a href="#disableancestors">Disable All Ancestors</a></li>
- <li class="disabledecendents"><a href="#disabledecendents">Disable Decendents</a></li>
- <li class="enableancestors"><a href="#enableancestors">Enable All Ancestors</a></li>
- <li class="disabledecendents"><a href="#disabledecendents">Disable Decendents</a></li>
</ul>
</div>
diff --git a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
index df2c192..6088ec9 100644
--- a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
@@ -54,7 +54,7 @@
<ul>
<li><a id="graphViewLink" href="#graph">Graph</a></li>
<li class="lidivider">|</li>
- <li><a id="jobslistViewLink" href="#jobslist">Job List</a></li>
+ <li><a id="executionsViewLink" href="#executions">Executions</a></li>
</ul>
</div>
<div id="graphView">
@@ -73,8 +73,36 @@
</div>
</div>
</div>
- <div id="jobListView">
- <p>This is my joblist view</p>
+ <div id="executionsView">
+ <div id="executionDiv">
+ <table id="execTable">
+ <thead>
+ <tr>
+ <th>Execution Id</th>
+ <th>User</th>
+ <th>Start Time</th>
+ <th>End Time</th>
+ <th>Elapsed</th>
+ <th>Status</th>
+ <th>Action</th>
+ </tr>
+ </thead>
+ <tbody id="execTableBody">
+ </tbody>
+ </table>
+ </div>
+
+ <div id="pageSelection">
+ <ul>
+ <li id="previous" class="first"><a><span class="arrow">←</span>Previous</a></li>
+ <li id="page1"><a href="#page1">1</a></li>
+ <li id="page2"><a href="#page2">2</a></li>
+ <li id="page3"><a href="#page3">3</a></li>
+ <li id="page4"><a href="#page4">4</a></li>
+ <li id="page5"><a href="#page5">5</a></li>
+ <li id="next"><a>Next<span class="arrow">→</span></a></li>
+ </ul>
+ </div>
</div>
</div>
#end
src/web/css/azkaban.css 48(+48 -0)
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 309536b..2c663cc 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -1259,6 +1259,54 @@ span.sublabel {
padding-left: 12px;
}
+#pageSelection {
+}
+
+#pageSelection ul {
+}
+
+#pageSelection ul li.first {
+ border-left: 1px solid #CCC;
+}
+
+
+#pageSelection ul li {
+ float: left;
+ background-color: #FFF;
+ border-top: 1px solid #CCC;
+ border-bottom: 1px solid #CCC;
+ border-right: 1px solid #CCC;
+ cursor: pointer;
+ padding-top: 8px;
+ padding-bottom: 8px;
+ font-size: 11pt;
+ text-decoration: none;
+ color: #3398cc;
+}
+
+#pageSelection ul li.disabled {
+ cursor: default;
+ color: #CCC;
+}
+
+#pageSelection ul li a{
+ font-size: 11pt;
+ text-decoration: none;
+ color: inherit;
+ cursor: inherit;
+ padding: 8px 12px;
+}
+
+#pageSelection ul li .arrow{
+ top: 0px;
+ margin: 0px 6px;
+}
+
+#pageSelection ul li.selected{
+ background-color: #c7eeff;
+}
+
+
/* old styles */
.azkaban-charts .hitarea {
src/web/js/azkaban.exflow.view.js 484(+484 -0)
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
new file mode 100644
index 0000000..3b3e1e1
--- /dev/null
+++ b/src/web/js/azkaban.exflow.view.js
@@ -0,0 +1,484 @@
+$.namespace('azkaban');
+
+var handleJobMenuClick = function(action, el, pos) {
+ var jobid = el[0].jobid;
+ var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowName + "&job=" + jobid;
+ if (action == "open") {
+ window.location.href = requestURL;
+ }
+ else if(action == "openwindow") {
+ window.open(requestURL);
+ }
+}
+
+function hasClass(el, name)
+{
+ var classes = el.getAttribute("class");
+ if (classes == null) {
+ return false;
+ }
+ return new RegExp('(\\s|^)'+name+'(\\s|$)').test(classes);
+}
+
+function addClass(el, name)
+{
+ if (!hasClass(el, name)) {
+ var classes = el.getAttribute("class");
+ classes += classes ? ' ' + name : '' +name;
+ el.setAttribute("class", classes);
+ }
+}
+
+function removeClass(el, name)
+{
+ if (hasClass(el, name)) {
+ var classes = el.getAttribute("class");
+ el.setAttribute("class", classes.replace(new RegExp('(\\s|^)'+name+'(\\s|$)'),' ').replace(/^\s+|\s+$/g, ''));
+ }
+}
+
+var flowTabView;
+azkaban.FlowTabView= Backbone.View.extend({
+ events : {
+ "click #graphViewLink" : "handleGraphLinkClick",
+ "click #jobslistViewLink" : "handleJobslistLinkClick"
+ },
+ initialize : function(settings) {
+ var selectedView = settings.selectedView;
+ if (selectedView == "jobslist") {
+ this.handleJobslistLinkClick();
+ }
+ else {
+ this.handleGraphLinkClick();
+ }
+
+ },
+ render: function() {
+ console.log("render graph");
+ },
+ handleGraphLinkClick: function(){
+ $("#jobslistViewLink").removeClass("selected");
+ $("#graphViewLink").addClass("selected");
+
+ $("#jobListView").hide();
+ $("#graphView").show();
+ },
+ handleJobslistLinkClick: function() {
+ $("#graphViewLink").removeClass("selected");
+ $("#jobslistViewLink").addClass("selected");
+
+ $("#graphView").hide();
+ $("#jobListView").show();
+ }
+});
+
+var jobListView;
+azkaban.JobListView = Backbone.View.extend({
+ events: {
+ "keyup input": "filterJobs",
+ "click li": "handleJobClick",
+ "click #resetPanZoomBtn" : "handleResetPanZoom"
+ },
+ initialize: function(settings) {
+ this.model.bind('change:selected', this.handleSelectionChange, this);
+ this.model.bind('change:graph', this.render, this);
+ },
+ filterJobs: function(self) {
+ var filter = $("#filter").val();
+
+ if (filter && filter.trim() != "") {
+ filter = filter.trim();
+
+ if (filter == "") {
+ if (this.filter) {
+ $("#jobs").children().each(
+ function(){
+ var a = $(this).find("a");
+ $(a).html(this.jobid);
+ $(this).show();
+ }
+ );
+ }
+
+ this.filter = null;
+ return;
+ }
+ }
+ else {
+ if (this.filter) {
+ $("#jobs").children().each(
+ function(){
+ var a = $(this).find("a");
+ $(a).html(this.jobid);
+ $(this).show();
+ }
+ );
+ }
+
+ this.filter = null;
+ return;
+ }
+
+ $("#jobs").children().each(
+ function(){
+ var jobid = this.jobid;
+ var index = jobid.indexOf(filter);
+ if (index != -1) {
+ var a = $(this).find("a");
+
+ var endIndex = index + filter.length;
+ var newHTML = jobid.substring(0, index) + "<span>" + jobid.substring(index, endIndex) + "</span>" + jobid.substring(endIndex, jobid.length);
+
+ $(a).html(newHTML);
+ $(this).show();
+ }
+ else {
+ $(this).hide();
+ }
+ });
+
+ this.filter = filter;
+ },
+ render: function(self) {
+ var data = this.model.get("data");
+ var nodes = data.nodes;
+ var edges = data.edges;
+
+ this.listNodes = {};
+ if (nodes.length == 0) {
+ console.log("No results");
+ return;
+ };
+
+ var nodeArray = nodes.slice(0);
+ nodeArray.sort(function(a,b){
+ var diff = a.y - b.y;
+ if (diff == 0) {
+ return a.x - b.x;
+ }
+ else {
+ return diff;
+ }
+ });
+
+ var ul = document.createElement("ul");
+ $(ul).attr("id", "jobs");
+ for (var i = 0; i < nodeArray.length; ++i) {
+ var li = document.createElement("li");
+ var a = document.createElement("a");
+ $(a).text(nodeArray[i].id);
+ li.appendChild(a);
+ ul.appendChild(li);
+ li.jobid=nodeArray[i].id;
+
+ $(li).contextMenu({
+ menu: 'jobMenu'
+ },
+ handleJobMenuClick
+ );
+
+ this.listNodes[nodeArray[i].id] = li;
+ }
+
+ $("#list").append(ul);
+ },
+ handleJobClick : function(evt) {
+ var jobid = evt.currentTarget.jobid;
+ if(!evt.currentTarget.jobid) {
+ return;
+ }
+
+ if (this.model.has("selected")) {
+ var selected = this.model.get("selected");
+ if (selected == jobid) {
+ this.model.unset("selected");
+ }
+ else {
+ this.model.set({"selected": jobid});
+ }
+ }
+ else {
+ this.model.set({"selected": jobid});
+ }
+ },
+ handleSelectionChange: function(evt) {
+ if (!this.model.hasChanged("selected")) {
+ return;
+ }
+
+ var previous = this.model.previous("selected");
+ var current = this.model.get("selected");
+
+ if (previous) {
+ $(this.listNodes[previous]).removeClass("selected");
+ }
+
+ if (current) {
+ $(this.listNodes[current]).addClass("selected");
+ }
+ },
+ handleResetPanZoom: function(evt) {
+ this.model.trigger("resetPanZoom");
+ }
+});
+
+var svgGraphView;
+azkaban.SvgGraphView = Backbone.View.extend({
+ events: {
+ "click g" : "clickGraph"
+ },
+ initialize: function(settings) {
+ this.model.bind('change:selected', this.changeSelected, this);
+ this.model.bind('change:graph', this.render, this);
+ this.model.bind('resetPanZoom', this.resetPanZoom, this);
+
+ this.svgns = "http://www.w3.org/2000/svg";
+ this.xlinksn = "http://www.w3.org/1999/xlink";
+
+ var graphDiv = this.el[0];
+ var svg = $('#svgGraph')[0];
+ this.svgGraph = svg;
+
+ var gNode = document.createElementNS(this.svgns, 'g');
+ gNode.setAttribute("id", "group");
+ svg.appendChild(gNode);
+ this.mainG = gNode;
+
+ $(svg).svgNavigate();
+ },
+ initializeDefs: function(self) {
+ var def = document.createElementNS(svgns, 'defs');
+ def.setAttributeNS(null, "id", "buttonDefs");
+
+ // ArrowHead
+ var arrowHeadMarker = document.createElementNS(svgns, 'marker');
+ arrowHeadMarker.setAttribute("id", "triangle");
+ arrowHeadMarker.setAttribute("viewBox", "0 0 10 10");
+ arrowHeadMarker.setAttribute("refX", "5");
+ arrowHeadMarker.setAttribute("refY", "5");
+ arrowHeadMarker.setAttribute("markerUnits", "strokeWidth");
+ arrowHeadMarker.setAttribute("markerWidth", "4");
+ arrowHeadMarker.setAttribute("markerHeight", "3");
+ arrowHeadMarker.setAttribute("orient", "auto");
+ var path = document.createElementNS(svgns, 'polyline');
+ arrowHeadMarker.appendChild(path);
+ path.setAttribute("points", "0,0 10,5 0,10 1,5");
+
+ def.appendChild(arrowHeadMarker);
+
+ this.svgGraph.appendChild(def);
+ },
+ render: function(self) {
+ console.log("graph render");
+
+ var data = this.model.get("data");
+ var nodes = data.nodes;
+ var edges = data.edges;
+ if (nodes.length == 0) {
+ console.log("No results");
+ return;
+ };
+
+ // layout
+ layoutGraph(nodes, edges);
+
+ var bounds = {};
+ this.nodes = {};
+ for (var i = 0; i < nodes.length; ++i) {
+ this.nodes[nodes[i].id] = nodes[i];
+ }
+
+ for (var i = 0; i < edges.length; ++i) {
+ this.drawEdge(this, edges[i]);
+ }
+
+ for (var i = 0; i < nodes.length; ++i) {
+ this.drawNode(this, nodes[i], bounds);
+ }
+
+ bounds.minX = bounds.minX ? bounds.minX - 200 : -200;
+ bounds.minY = bounds.minY ? bounds.minY - 200 : -200;
+ bounds.maxX = bounds.maxX ? bounds.maxX + 200 : 200;
+ bounds.maxY = bounds.maxY ? bounds.maxY + 200 : 200;
+
+ this.graphBounds = bounds;
+ this.resetPanZoom();
+ },
+ changeSelected: function(self) {
+ console.log("change selected");
+ var selected = this.model.get("selected");
+ var previous = this.model.previous("selected");
+
+ if (previous) {
+ // Unset previous
+ var g = document.getElementById(previous);
+ removeClass(g, "selected");
+ }
+
+ if (selected) {
+ var g = document.getElementById(selected);
+ var node = this.nodes[selected];
+
+ addClass(g, "selected");
+
+ var offset = 200;
+ var widthHeight = offset*2;
+ var x = node.x - offset;
+ var y = node.y - offset;
+
+
+ $("#svgGraph").svgNavigate("transformToBox", {x: x, y: y, width: widthHeight, height: widthHeight});
+ }
+ },
+ clickGraph: function(self) {
+ console.log("click");
+ if (self.currentTarget.jobid) {
+ this.model.set({"selected": self.currentTarget.jobid});
+ }
+ },
+ drawEdge: function(self, edge) {
+ var svg = self.svgGraph;
+ var svgns = self.svgns;
+
+ var startNode = this.nodes[edge.from];
+ var endNode = this.nodes[edge.target];
+
+ if (edge.guides) {
+ var pointString = "" + startNode.x + "," + startNode.y + " ";
+
+ for (var i = 0; i < edge.guides.length; ++i ) {
+ edgeGuidePoint = edge.guides[i];
+ pointString += edgeGuidePoint.x + "," + edgeGuidePoint.y + " ";
+ }
+
+ pointString += endNode.x + "," + endNode.y;
+ var polyLine = document.createElementNS(svgns, "polyline");
+ polyLine.setAttributeNS(null, "class", "edge");
+ polyLine.setAttributeNS(null, "points", pointString);
+ polyLine.setAttributeNS(null, "style", "fill:none;");
+ self.mainG.appendChild(polyLine);
+ }
+ else {
+ var line = document.createElementNS(svgns, 'line');
+ line.setAttributeNS(null, "class", "edge");
+ line.setAttributeNS(null, "x1", startNode.x);
+ line.setAttributeNS(null, "y1", startNode.y);
+ line.setAttributeNS(null, "x2", endNode.x);
+ line.setAttributeNS(null, "y2", endNode.y);
+
+ self.mainG.appendChild(line);
+ }
+ },
+ drawNode: function(self, node, bounds) {
+ var svg = self.svgGraph;
+ var svgns = self.svgns;
+
+ var xOffset = 10;
+ var yOffset = 10;
+
+ var nodeG = document.createElementNS(svgns, "g");
+ nodeG.setAttributeNS(null, "class", "jobnode");
+ nodeG.setAttributeNS(null, "id", node.id);
+ nodeG.setAttributeNS(null, "font-family", "helvetica");
+ nodeG.setAttributeNS(null, "transform", "translate(" + node.x + "," + node.y + ")");
+
+ var innerG = document.createElementNS(svgns, "g");
+ innerG.setAttributeNS(null, "transform", "translate(-10,-10)");
+
+ var circle = document.createElementNS(svgns, 'circle');
+ circle.setAttributeNS(null, "cy", 10);
+ circle.setAttributeNS(null, "cx", 10);
+ circle.setAttributeNS(null, "r", 12);
+ circle.setAttributeNS(null, "style", "width:inherit;stroke-opacity:1");
+
+
+ var text = document.createElementNS(svgns, 'text');
+ var textLabel = document.createTextNode(node.label);
+ text.appendChild(textLabel);
+ text.setAttributeNS(null, "x", 4);
+ text.setAttributeNS(null, "y", 15);
+ text.setAttributeNS(null, "height", 10);
+
+ this.addBounds(bounds, {minX:node.x - xOffset, minY: node.y - yOffset, maxX: node.x + xOffset, maxY: node.y + yOffset});
+
+ var backRect = document.createElementNS(svgns, 'rect');
+ backRect.setAttributeNS(null, "x", 0);
+ backRect.setAttributeNS(null, "y", 2);
+ backRect.setAttributeNS(null, "class", "backboard");
+ backRect.setAttributeNS(null, "width", 10);
+ backRect.setAttributeNS(null, "height", 15);
+
+ innerG.appendChild(circle);
+ innerG.appendChild(backRect);
+ innerG.appendChild(text);
+ innerG.jobid = node.id;
+
+ nodeG.appendChild(innerG);
+ self.mainG.appendChild(nodeG);
+
+ // Need to get text width after attaching to SVG.
+ var computeText = text.getComputedTextLength();
+ var halfWidth = computeText/2;
+ text.setAttributeNS(null, "x", -halfWidth + 10);
+ backRect.setAttributeNS(null, "x", -halfWidth);
+ backRect.setAttributeNS(null, "width", computeText + 20);
+
+ nodeG.setAttributeNS(null, "class", "node");
+ nodeG.jobid=node.id;
+ $(nodeG).contextMenu({
+ menu: 'jobMenu'
+ },
+ handleJobMenuClick
+ );
+ },
+ addBounds: function(toBounds, addBounds) {
+ toBounds.minX = toBounds.minX ? Math.min(toBounds.minX, addBounds.minX) : addBounds.minX;
+ toBounds.minY = toBounds.minY ? Math.min(toBounds.minY, addBounds.minY) : addBounds.minY;
+ toBounds.maxX = toBounds.maxX ? Math.max(toBounds.maxX, addBounds.maxX) : addBounds.maxX;
+ toBounds.maxY = toBounds.maxY ? Math.max(toBounds.maxY, addBounds.maxY) : addBounds.maxY;
+ },
+ resetPanZoom : function(self) {
+ var bounds = this.graphBounds;
+ $("#svgGraph").svgNavigate("transformToBox", {x: bounds.minX, y: bounds.minY, width: (bounds.maxX - bounds.minX), height: (bounds.maxY - bounds.minY) });
+ }
+});
+
+var graphModel;
+azkaban.GraphModel = Backbone.Model.extend({});
+
+$(function() {
+ var selected;
+
+ if (window.location.hash) {
+ var hash = window.location.hash;
+ if (hash == "#jobslist") {
+ selected = "jobslist";
+ }
+ else if (hash == "#graph") {
+ // Redundant, but we may want to change the default.
+ selected = "graph";
+ }
+ else {
+ selected = "graph";
+ }
+ }
+ flowTabView = new azkaban.FlowTabView({el:$( '#headertabs'), selectedView: selected });
+
+ graphModel = new azkaban.GraphModel();
+ svgGraphView = new azkaban.SvgGraphView({el:$('#svgDiv'), model: graphModel});
+ jobsListView = new azkaban.JobListView({el:$('#jobList'), model: graphModel});
+
+ var requestURL = contextURL + "/executor";
+
+ $.get(
+ requestURL,
+ {"execid": execId, "ajax":"fetchexecflow"},
+ function(data) {
+ console.log("data fetched");
+ graphModel.set({data: data});
+ graphModel.trigger("change:graph");
+ },
+ "json"
+ );
+
+});
src/web/js/azkaban.flow.view.js 228(+206 -22)
diff --git a/src/web/js/azkaban.flow.view.js b/src/web/js/azkaban.flow.view.js
index c2eaf7e..5c7d0e9 100644
--- a/src/web/js/azkaban.flow.view.js
+++ b/src/web/js/azkaban.flow.view.js
@@ -81,12 +81,12 @@ var flowTabView;
azkaban.FlowTabView= Backbone.View.extend({
events : {
"click #graphViewLink" : "handleGraphLinkClick",
- "click #jobslistViewLink" : "handleJobslistLinkClick"
+ "click #executionsViewLink" : "handleExecutionLinkClick"
},
initialize : function(settings) {
var selectedView = settings.selectedView;
- if (selectedView == "jobslist") {
- this.handleJobslistLinkClick();
+ if (selectedView == "executions") {
+ this.handleExecutionLinkClick();
}
else {
this.handleGraphLinkClick();
@@ -97,18 +97,19 @@ azkaban.FlowTabView= Backbone.View.extend({
console.log("render graph");
},
handleGraphLinkClick: function(){
- $("#jobslistViewLink").removeClass("selected");
+ $("#executionsViewLink").removeClass("selected");
$("#graphViewLink").addClass("selected");
- $("#jobListView").hide();
+ $("#executionsView").hide();
$("#graphView").show();
},
- handleJobslistLinkClick: function() {
+ handleExecutionLinkClick: function() {
$("#graphViewLink").removeClass("selected");
- $("#jobslistViewLink").addClass("selected");
+ $("#executionsViewLink").addClass("selected");
$("#graphView").hide();
- $("#jobListView").show();
+ $("#executionsView").show();
+ executionModel.trigger("change:view");
}
});
@@ -517,25 +518,177 @@ azkaban.SvgGraphView = Backbone.View.extend({
}
});
-var graphModel;
-azkaban.GraphModel = Backbone.Model.extend({});
+var executionsView;
+azkaban.ExecutionsView = Backbone.View.extend({
+ events: {
+ "click #pageSelection li": "handleChangePageSelection"
+ },
+ initialize: function(settings) {
+ this.model.bind('change:view', this.handleChangeView, this);
+ this.model.bind('render', this.render, this);
+ this.model.set({page: 1, pageSize: 20});
+ this.model.bind('change:page', this.handlePageChange, this);
+ },
+ render: function(evt) {
+ console.log("render");
+ // Render page selections
+ var tbody = $("#execTableBody");
+ tbody.empty();
+
+ var executions = this.model.get("executions");
+ for (var i = 0; i < executions.length; ++i) {
+ var row = document.createElement("tr");
+
+ var tdId = document.createElement("td");
+ $(tdId).text(executions[i].execId);
+ row.appendChild(tdId);
+
+ var tdUser = document.createElement("td");
+ $(tdUser).text(executions[i].submitUser);
+ row.appendChild(tdUser);
+
+ var tdStartTime = document.createElement("td");
+ $(tdStartTime).text(executions[i].startTime);
+ row.appendChild(tdStartTime);
+
+ var tdEndTime = document.createElement("td");
+ $(tdEndTime).text(executions[i].endTime);
+ row.appendChild(tdEndTime);
+
+ var tdElapsed = document.createElement("td");
+ $(tdElapsed).text(executions[i].endTime - executions[i].startTime);
+ row.appendChild(tdElapsed);
+
+ var tdStatus = document.createElement("td");
+ $(tdStatus).text(executions[i].status);
+ row.appendChild(tdStatus);
-$(function() {
- var selected;
-
- if (window.location.hash) {
- var hash = window.location.hash;
- if (hash == "#jobslist") {
- selected = "jobslist";
+ tbody.append(row);
}
- else if (hash == "#graph") {
- // Redundant, but we may want to change the default.
- selected = "graph";
+
+ this.renderPagination(evt);
+ },
+ renderPagination: function(evt) {
+ var total = this.model.get("total");
+ total = total? total : 1;
+ var pageSize = this.model.get("pageSize");
+ var numPages = Math.ceil(total/pageSize);
+
+ this.model.set({"numPages": numPages});
+ var page = this.model.get("page");
+
+ //Start it off
+ $("#pageSelection .selected").removeClass("selected");
+
+ // Disable if less than 5
+ console.log("Num pages " + numPages)
+ var i = 1;
+ for (; i <= numPages && i <= 5; ++i) {
+ $("#page" + i).removeClass("disabled");
+ }
+ for (; i <= 5; ++i) {
+ $("#page" + i).addClass("disabled");
+ }
+
+ // Disable prev/next if necessary.
+ if (page > 1) {
+ $("#previous").removeClass("disabled");
+ $("#previous")[0].page = page - 1;
+ $("#previous a").attr("href", "#page" + (page - 1));
+ }
+ else {
+ $("#previous").addClass("disabled");
+ }
+
+ if (page < numPages) {
+ $("#next")[0].page = page + 1;
+ $("#next").removeClass("disabled");
+ $("#next a").attr("href", "#page" + (page + 1));
}
else {
- selected = "graph";
+ $("#next")[0].page = page + 1;
+ $("#next").addClass("disabled");
}
+
+ // Selection is always in middle unless at barrier.
+ if (page < 3) {
+ selectionPosition = page;
+ }
+ else if (page > numPages - 2) {
+ selectionPosition = 5 - (numPages - page) - 1;
+ }
+ else {
+ selectionPosition = 3;
+ }
+
+ $("#page"+selectionPosition).addClass("selected");
+ $("#page"+selectionPosition)[0].page = page;
+ var selecta = $("#page" + selectionPosition + " a");
+ selecta.text(page);
+ selecta.attr("href", "#page" + page);
+
+ for (var j = 1, tpage = page - selectionPosition + 1; j < selectionPosition; ++j, ++tpage) {
+ $("#page" + j)[0].page = tpage;
+ var a = $("#page" + i + " a");
+ a.text(tpage);
+ a.attr("href", "#page" + tpage);
+ }
+
+ for (var i = selectionPosition + 1, tpage = page + 1; i <= numPages; ++i, ++tpage) {
+ $("#page" + i)[0].page = tpage;
+ var a = $("#page" + i + " a");
+ a.text(tpage);
+ a.attr("href", "#page" + tpage);
+ }
+ },
+ handleChangePageSelection: function(evt) {
+ if ($(evt.currentTarget).hasClass("disabled")) {
+ return;
+ }
+ var page = evt.currentTarget.page;
+
+ this.model.set({"page": page});
+ },
+ handleChangeView: function(evt) {
+ if (this.init) {
+ return;
+ }
+
+ console.log("init");
+ this.handlePageChange(evt);
+ this.init = true;
+ },
+ handlePageChange: function(evt) {
+ var page = this.model.get("page") - 1;
+ var pageSize = this.model.get("pageSize");
+ var requestURL = contextURL + "/manager";
+
+ var model = this.model;
+ $.get(
+ requestURL,
+ {"project": projectName, "flow":flowName, "ajax": "fetchFlowExecutions", "start":page * pageSize, "length": pageSize},
+ function(data) {
+ model.set({"executions": data.executions, "total": data.total});
+ model.trigger("render");
+ },
+ "json"
+ );
+
}
+});
+
+var graphModel;
+azkaban.GraphModel = Backbone.Model.extend({});
+
+var executionModel;
+azkaban.ExecutionModel = Backbone.Model.extend({});
+
+$(function() {
+ var selected;
+ // Execution model has to be created before the window switches the tabs.
+ executionModel = new azkaban.ExecutionModel();
+ executionsView = new azkaban.ExecutionsView({el: $('#executionsView'), model: executionModel});
+
flowTabView = new azkaban.FlowTabView({el:$( '#headertabs'), selectedView: selected });
graphModel = new azkaban.GraphModel();
@@ -551,6 +704,30 @@ $(function() {
console.log("data fetched");
graphModel.set({data: data});
graphModel.trigger("change:graph");
+
+ // Handle the hash changes here so the graph finishes rendering first.
+ if (window.location.hash) {
+ var hash = window.location.hash;
+
+ if (hash == "#executions") {
+ flowTabView.handleExecutionLinkClick();
+ }
+ else if (hash == "#graph") {
+ // Redundant, but we may want to change the default.
+ selected = "graph";
+ }
+ else {
+ if ("#page" == hash.substring(0, "#page".length)) {
+ var page = hash.substring("#page".length, hash.length);
+ console.log("page " + page);
+ flowTabView.handleExecutionLinkClick();
+ executionModel.set({"page": parseInt(page)});
+ }
+ else {
+ selected = "graph";
+ }
+ }
+ }
},
"json"
);
@@ -561,10 +738,17 @@ $(function() {
executeURL,
{"project": projectName, "ajax":"executeFlow", "flow":flowName, "disabled":graphModel.get("disabled")},
function(data) {
- alert("call success");
+ if (data.error) {
+ alert(data.error);
+ }
+ else {
+ var redirectURL = contextURL + "/executor?execid=" + data.execid;
+ window.location.href = redirectURL;
+ }
},
"json"
);
});
+
});