azkaban-uncached
Changes
.classpath 3(+3 -0)
src/java/azkaban/executor/ExecutableFlow.java 151(+2 -149)
src/java/azkaban/executor/ExecutableNode.java 157(+157 -0)
src/java/azkaban/executor/JobRunner.java 19(+16 -3)
src/java/azkaban/executor/NodeStatus.java 105(+105 -0)
src/web/js/azkaban.exflow.view.js 4(+2 -2)
Details
.classpath 3(+3 -0)
diff --git a/.classpath b/.classpath
index 7b8b139..12ee55d 100644
--- a/.classpath
+++ b/.classpath
@@ -29,5 +29,8 @@
<classpathentry kind="lib" path="plugins/ldap/li-azkaban-ldap.jar"/>
<classpathentry kind="lib" path="lib/commons-email-1.2.jar"/>
<classpathentry kind="lib" path="lib/mail-1.4.5.jar"/>
+ <classpathentry kind="lib" path="lib/avro-1.4.1.jar"/>
+ <classpathentry kind="lib" path="lib/commons-configuration-1.8.jar"/>
+ <classpathentry kind="lib" path="lib/voldemort-0.96.jar"/>
<classpathentry kind="output" path="dist/classes"/>
</classpath>
src/java/azkaban/executor/ExecutableFlow.java 151(+2 -149)
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 791df64..b3e3a81 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -131,7 +131,7 @@ public class ExecutableFlow {
startNodes = new ArrayList<String>();
for (ExecutableNode node: executableNodes.values()) {
if (node.getInNodes().isEmpty()) {
- startNodes.add(node.id);
+ startNodes.add(node.getId());
}
}
}
@@ -144,7 +144,7 @@ public class ExecutableFlow {
endNodes = new ArrayList<String>();
for (ExecutableNode node: executableNodes.values()) {
if (node.getOutNodes().isEmpty()) {
- endNodes.add(node.id);
+ endNodes.add(node.getId());
}
}
}
@@ -405,151 +405,4 @@ public class ExecutableFlow {
public boolean getNotifyOnLastFailure() {
return this.notifyOnLastFailure;
}
-
- public static class ExecutableNode {
- private String id;
-
- private String type;
- private String jobPropsSource;
- private String inheritPropsSource;
- private String outputPropsSource;
- private Status status = Status.READY;
- private long startTime = -1;
- private long endTime = -1;
- private int level = 0;
- private ExecutableFlow flow;
-
- private Set<String> inNodes = new HashSet<String>();
- private Set<String> outNodes = new HashSet<String>();
-
- private ExecutableNode(Node node, ExecutableFlow flow) {
- id = node.getId();
- type = node.getType();
- jobPropsSource = node.getJobSource();
- inheritPropsSource = node.getPropsSource();
- status = Status.READY;
- level = node.getLevel();
- this.flow = flow;
- }
-
- public ExecutableNode() {
- }
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public void addInNode(String exNode) {
- inNodes.add(exNode);
- }
-
- public void addOutNode(String exNode) {
- outNodes.add(exNode);
- }
-
- public Set<String> getOutNodes() {
- return outNodes;
- }
-
- public Set<String> getInNodes() {
- return inNodes;
- }
-
- public Status getStatus() {
- return status;
- }
-
- public void setStatus(Status status) {
- this.status = status;
- }
-
- public Object toObject() {
- HashMap<String, Object> objMap = new HashMap<String, Object>();
- objMap.put("id", id);
- objMap.put("jobSource", jobPropsSource);
- objMap.put("propSource", inheritPropsSource);
- objMap.put("jobType", type);
- objMap.put("status", status.toString());
- objMap.put("inNodes", inNodes);
- objMap.put("outNodes", outNodes);
- objMap.put("startTime", startTime);
- objMap.put("endTime", endTime);
- objMap.put("level", level);
-
- if (outputPropsSource != null) {
- objMap.put("outputSource", outputPropsSource);
- }
-
- return objMap;
- }
-
- @SuppressWarnings("unchecked")
- public static ExecutableNode createNodeFromObject(Object obj, ExecutableFlow flow) {
- ExecutableNode exNode = new ExecutableNode();
-
- HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
- exNode.id = (String)objMap.get("id");
- exNode.jobPropsSource = (String)objMap.get("jobSource");
- exNode.inheritPropsSource = (String)objMap.get("propSource");
- exNode.outputPropsSource = (String)objMap.get("outputSource");
- exNode.type = (String)objMap.get("jobType");
- exNode.status = Status.valueOf((String)objMap.get("status"));
-
- exNode.inNodes.addAll( (List<String>)objMap.get("inNodes") );
- exNode.outNodes.addAll( (List<String>)objMap.get("outNodes") );
-
- exNode.startTime = JSONUtils.getLongFromObject(objMap.get("startTime"));
- exNode.endTime = JSONUtils.getLongFromObject(objMap.get("endTime"));
- exNode.level = (Integer)objMap.get("level");
-
- exNode.flow = flow;
-
- return exNode;
- }
-
- @SuppressWarnings("unchecked")
- public void updateNodeFromObject(Object obj) {
- HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
- status = Status.valueOf((String)objMap.get("status"));
-
- startTime = JSONUtils.getLongFromObject(objMap.get("startTime"));
- endTime = JSONUtils.getLongFromObject(objMap.get("endTime"));
- }
-
- public long getStartTime() {
- return startTime;
- }
-
- public void setStartTime(long startTime) {
- this.startTime = startTime;
- }
-
- public long getEndTime() {
- return endTime;
- }
-
- public void setEndTime(long endTime) {
- this.endTime = endTime;
- }
-
- public String getJobPropsSource() {
- return jobPropsSource;
- }
-
- public String getPropsSource() {
- return inheritPropsSource;
- }
-
- public int getLevel() {
- return level;
- }
-
- public ExecutableFlow getFlow() {
- return flow;
- }
- }
}
src/java/azkaban/executor/ExecutableNode.java 157(+157 -0)
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
new file mode 100644
index 0000000..cfb4387
--- /dev/null
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -0,0 +1,157 @@
+package azkaban.executor;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import azkaban.executor.ExecutableFlow.Status;
+import azkaban.flow.Node;
+import azkaban.utils.JSONUtils;
+
+public class ExecutableNode {
+ private String id;
+
+ private String type;
+ private String jobPropsSource;
+ private String inheritPropsSource;
+ private String outputPropsSource;
+ private Status status = Status.READY;
+ private long startTime = -1;
+ private long endTime = -1;
+ private int level = 0;
+ private ExecutableFlow flow;
+
+ private Set<String> inNodes = new HashSet<String>();
+ private Set<String> outNodes = new HashSet<String>();
+
+ public ExecutableNode(Node node, ExecutableFlow flow) {
+ id = node.getId();
+ type = node.getType();
+ jobPropsSource = node.getJobSource();
+ inheritPropsSource = node.getPropsSource();
+ status = Status.READY;
+ level = node.getLevel();
+ this.flow = flow;
+ }
+
+ public ExecutableNode() {
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public void addInNode(String exNode) {
+ inNodes.add(exNode);
+ }
+
+ public void addOutNode(String exNode) {
+ outNodes.add(exNode);
+ }
+
+ public Set<String> getOutNodes() {
+ return outNodes;
+ }
+
+ public Set<String> getInNodes() {
+ return inNodes;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+ public void setStatus(Status status) {
+ this.status = status;
+ }
+
+ public Object toObject() {
+ HashMap<String, Object> objMap = new HashMap<String, Object>();
+ objMap.put("id", id);
+ objMap.put("jobSource", jobPropsSource);
+ objMap.put("propSource", inheritPropsSource);
+ objMap.put("jobType", type);
+ objMap.put("status", status.toString());
+ objMap.put("inNodes", inNodes);
+ objMap.put("outNodes", outNodes);
+ objMap.put("startTime", startTime);
+ objMap.put("endTime", endTime);
+ objMap.put("level", level);
+
+ if (outputPropsSource != null) {
+ objMap.put("outputSource", outputPropsSource);
+ }
+
+ return objMap;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static ExecutableNode createNodeFromObject(Object obj, ExecutableFlow flow) {
+ ExecutableNode exNode = new ExecutableNode();
+
+ HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
+ exNode.id = (String)objMap.get("id");
+ exNode.jobPropsSource = (String)objMap.get("jobSource");
+ exNode.inheritPropsSource = (String)objMap.get("propSource");
+ exNode.outputPropsSource = (String)objMap.get("outputSource");
+ exNode.type = (String)objMap.get("jobType");
+ exNode.status = Status.valueOf((String)objMap.get("status"));
+
+ exNode.inNodes.addAll( (List<String>)objMap.get("inNodes") );
+ exNode.outNodes.addAll( (List<String>)objMap.get("outNodes") );
+
+ exNode.startTime = JSONUtils.getLongFromObject(objMap.get("startTime"));
+ exNode.endTime = JSONUtils.getLongFromObject(objMap.get("endTime"));
+ exNode.level = (Integer)objMap.get("level");
+
+ exNode.flow = flow;
+
+ return exNode;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void updateNodeFromObject(Object obj) {
+ HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
+ status = Status.valueOf((String)objMap.get("status"));
+
+ startTime = JSONUtils.getLongFromObject(objMap.get("startTime"));
+ endTime = JSONUtils.getLongFromObject(objMap.get("endTime"));
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(long endTime) {
+ this.endTime = endTime;
+ }
+
+ public String getJobPropsSource() {
+ return jobPropsSource;
+ }
+
+ public String getPropsSource() {
+ return inheritPropsSource;
+ }
+
+ public int getLevel() {
+ return level;
+ }
+
+ public ExecutableFlow getFlow() {
+ return flow;
+ }
+}
\ No newline at end of file
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 03dace2..57dcf13 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -40,7 +40,6 @@ import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;
-import azkaban.executor.ExecutableFlow.ExecutableNode;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.flow.Flow;
import azkaban.utils.ExecutableFlowLoader;
@@ -55,6 +54,8 @@ import azkaban.webapp.AzkabanExecutorServer;
public class ExecutorManager {
private static final String ACTIVE_DIR = ".active";
private static final String ARCHIVE_DIR = ".archive";
+ private static final String JOB_EXECUTION_DIR = ".jobexecutions";
+
private static Logger logger = Logger.getLogger(ExecutorManager.class);
// 30 seconds of retry before failure.
private static final long ACCESS_ERROR_THRESHOLD_MS = 30000;
@@ -124,6 +125,40 @@ public class ExecutorManager {
manager.addCache(recentFlowsCache);
}
+ public int getJobHistory(String projectId, String jobId, int numResults, int skip, List<NodeStatus> nodes) throws ExecutorManagerException{
+ File flowProjectPath = new File(basePath, projectId);
+ if (!flowProjectPath.exists()) {
+ throw new ExecutorManagerException("Project " + projectId + " directory doesn't exist.");
+ }
+
+ File jobStatusDir = new File(flowProjectPath, JOB_EXECUTION_DIR + File.separator + jobId);
+
+ File[] jobsStatusFiles = jobStatusDir.listFiles();
+
+ if (jobsStatusFiles.length == 0 || skip >= jobsStatusFiles.length) {
+ return 0;
+ }
+
+ Arrays.sort(jobsStatusFiles);
+ int index = (jobsStatusFiles.length - skip - 1);
+
+ for (int count = 0; count < numResults && index >= 0; ++count, --index) {
+ File exDir = jobsStatusFiles[index];
+
+ NodeStatus status;
+ try {
+ status = NodeStatus.createNodeFromObject(JSONUtils.parseJSONFromFile(exDir));
+ if (status != null) {
+ nodes.add(status);
+ }
+ } catch (IOException e) {
+ throw new ExecutorManagerException(e.getMessage());
+ }
+ }
+
+ return jobsStatusFiles.length;
+ }
+
public int getExecutableFlows(String projectId, String flowId, int from, int maxResults, List<ExecutableFlow> output) {
String projectPath = projectId + File.separator + flowId;
File flowProjectPath = new File(basePath, projectPath);
@@ -183,14 +218,14 @@ public class ExecutorManager {
return execFlows;
}
- public List<ExecutionReference> getFlowHistory(String rePattern, int numResults, int skip) {
+ public List<ExecutionReference> getFlowHistory(String regexPattern, int numResults, int skip) {
ArrayList<ExecutionReference> searchFlows = new ArrayList<ExecutionReference>();
Pattern pattern;
try {
- pattern = Pattern.compile(rePattern, Pattern.CASE_INSENSITIVE);
+ pattern = Pattern.compile(regexPattern, Pattern.CASE_INSENSITIVE);
} catch (PatternSyntaxException e) {
- logger.error("Bad regex pattern " + rePattern);
+ logger.error("Bad regex pattern " + regexPattern);
return searchFlows;
}
@@ -316,13 +351,19 @@ public class ExecutorManager {
public synchronized void setupExecutableFlow(ExecutableFlow exflow) throws ExecutorManagerException {
String executionId = exflow.getExecutionId();
- String projectFlowDir = exflow.getProjectId() + File.separator + exflow.getFlowId() + File.separator + executionId;
- File executionPath = new File(basePath, projectFlowDir);
+ File projectDir = new File(basePath, exflow.getProjectId());
+ String executionDir = exflow.getFlowId() + File.separator + executionId;
+ File executionPath = new File(projectDir, executionDir);
if (executionPath.exists()) {
throw new ExecutorManagerException("Execution path " + executionPath + " exists. Probably a simultaneous execution.");
}
executionPath.mkdirs();
+
+ // create job reference dir
+ File jobExecutionDir = new File(projectDir, JOB_EXECUTION_DIR);
+ jobExecutionDir.mkdirs();
+
exflow.setExecutionPath(executionPath.getPath());
}
@@ -420,7 +461,7 @@ public class ExecutorManager {
}
// Load last update
- ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, exFlow, true);
+ updateFlowFromFile(exFlow);
// Return if already finished.
if (exFlow.getStatus() == Status.FAILED ||
@@ -1020,7 +1061,7 @@ public class ExecutorManager {
if (statusStr.equals(ConnectorParams.RESPONSE_NOTFOUND)) {
logger.info("Server status response for " + reference.toRefString() + " was 'notfound'. Cleaning up");
try {
- ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, flow, true);
+ updateFlowFromFile(flow);
forceFail = true;
} catch (ExecutorManagerException e) {
logger.error("Error updating flow status " + flow.getExecutionId() + " from file.", e);
@@ -1033,7 +1074,7 @@ public class ExecutorManager {
if (time > flow.getUpdateTime()) {
try {
- ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, flow, true);
+ updateFlowFromFile(flow);
// Update reference
reference.setStartTime(flow.getStartTime());
reference.setEndTime(flow.getEndTime());
@@ -1087,6 +1128,14 @@ public class ExecutorManager {
}
}
+ private void updateFlowFromFile(ExecutableFlow exFlow) throws ExecutorManagerException {
+ File executionDir = new File(exFlow.getExecutionPath());
+ if (ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, exFlow, true)) {
+ // Move all the static directories if the update has changed.
+ File statusDir = new File(basePath, exFlow.getProjectId() + File.separator + JOB_EXECUTION_DIR);
+ ExecutableFlowLoader.moveJobStatusFiles(executionDir, statusDir);
+ }
+ }
/**
* Reference to a Flow Execution.
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index 51d8431..970045e 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -22,7 +22,6 @@ import org.apache.log4j.FileAppender;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
-import azkaban.executor.ExecutableFlow.ExecutableNode;
import azkaban.executor.ExecutableFlow.FailureAction;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.event.Event;
src/java/azkaban/executor/JobRunner.java 19(+16 -3)
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index 5c79ea7..d04c878 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -11,7 +11,6 @@ import org.apache.log4j.Layout;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
-import azkaban.executor.ExecutableFlow.ExecutableNode;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.event.Event;
import azkaban.executor.event.Event.Type;
@@ -19,6 +18,7 @@ import azkaban.executor.event.EventHandler;
import azkaban.jobExecutor.AbstractProcessJob;
import azkaban.jobExecutor.Job;
import azkaban.jobExecutor.utils.JobWrappingFactory;
+import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
public class JobRunner extends EventHandler implements Runnable {
@@ -90,6 +90,17 @@ public class JobRunner extends EventHandler implements Runnable {
}
}
+ private void writeStatus() {
+ NodeStatus status = new NodeStatus(this.node);
+ String statusName = "_job." + executionId + "." + node.getId() + ".status";
+ File statusFile = new File(workingDir, statusName);
+ try {
+ JSONUtils.toJSON(status.toObject(), statusFile);
+ } catch (IOException e) {
+ logger.error("Couldn't write status file.");
+ }
+ }
+
@Override
public void run() {
node.setStartTime(System.currentTimeMillis());
@@ -106,11 +117,12 @@ public class JobRunner extends EventHandler implements Runnable {
createLogger();
this.node.setStatus(Status.WAITING);
-
+
logInfo("Starting job " + node.getId() + " at " + node.getStartTime());
node.setStatus(Status.RUNNING);
this.fireEventListeners(Event.create(this, Type.JOB_STARTED));
-
+ writeStatus();
+
boolean succeeded = true;
props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
@@ -149,6 +161,7 @@ public class JobRunner extends EventHandler implements Runnable {
}
logInfo("Finishing job " + node.getId() + " at " + node.getEndTime());
closeLogger();
+ writeStatus();
}
public synchronized void cancel() {
src/java/azkaban/executor/NodeStatus.java 105(+105 -0)
diff --git a/src/java/azkaban/executor/NodeStatus.java b/src/java/azkaban/executor/NodeStatus.java
new file mode 100644
index 0000000..5719740
--- /dev/null
+++ b/src/java/azkaban/executor/NodeStatus.java
@@ -0,0 +1,105 @@
+package azkaban.executor;
+
+import java.util.HashMap;
+
+import azkaban.executor.ExecutableFlow.Status;
+import azkaban.utils.JSONUtils;
+
+// Will need to remove these as we roll out database instead
+public class NodeStatus {
+ private final String execId;
+ private final String projectId;
+ private final String jobId;
+ private final String flowId;
+ private Status status;
+ private long startTime;
+ private long endTime;
+
+ private NodeStatus(String execId, String projectId, String jobId, String flowId) {
+ this.execId = execId;
+ this.projectId = projectId;
+ this.jobId = jobId;
+ this.flowId = flowId;
+ }
+
+ public NodeStatus(ExecutableNode node) {
+ this.execId = node.getFlow().getExecutionId();
+ this.projectId = node.getFlow().getProjectId();
+ this.jobId = node.getId();
+ this.status = node.getStatus();
+ this.startTime = node.getStartTime();
+ this.endTime = node.getEndTime();
+ this.flowId = node.getFlow().getFlowId();
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+ public void setStatus(Status status) {
+ this.status = status;
+ }
+
+ public String getExecId() {
+ return execId;
+ }
+
+ public String getProjectId() {
+ return projectId;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public String getFlowId() {
+ return flowId;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(long endTime) {
+ this.endTime = endTime;
+ }
+
+ public Object toObject() {
+ HashMap<String, Object> objMap = new HashMap<String, Object>();
+ objMap.put("execId", execId);
+ objMap.put("jobId", jobId);
+ objMap.put("status", status.toString());
+ objMap.put("startTime", startTime);
+ objMap.put("endTime", endTime);
+ objMap.put("flowId", flowId);
+ return objMap;
+ }
+
+ public static NodeStatus createNodeFromObject(Object obj) {
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> objMap = (HashMap<String, Object>)obj;
+ String execId = (String)objMap.get("execId");
+ String projectId = (String)objMap.get("projectId");
+ String jobId = (String)objMap.get("jobId");
+ String flowId = (String)objMap.get("flowId");
+
+ NodeStatus nodeStatus = new NodeStatus(execId, projectId, jobId, flowId);
+ Status status = Status.valueOf((String)objMap.get("status"));
+ long startTime = JSONUtils.getLongFromObject(objMap.get("startTime"));
+ long endTime = JSONUtils.getLongFromObject(objMap.get("endTime"));
+
+ nodeStatus.status = status;
+ nodeStatus.startTime = startTime;
+ nodeStatus.endTime = endTime;
+
+ return nodeStatus;
+ }
+}
\ No newline at end of file
diff --git a/src/java/azkaban/utils/ExecutableFlowLoader.java b/src/java/azkaban/utils/ExecutableFlowLoader.java
index e7571d7..4e31562 100644
--- a/src/java/azkaban/utils/ExecutableFlowLoader.java
+++ b/src/java/azkaban/utils/ExecutableFlowLoader.java
@@ -11,6 +11,7 @@ import org.apache.log4j.Logger;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.NodeStatus;
public class ExecutableFlowLoader {
private static final Logger logger = Logger.getLogger(ExecutableFlowLoader.class.getName());
@@ -126,6 +127,30 @@ public class ExecutableFlowLoader {
return true;
}
+ public static void moveJobStatusFiles(File exDir, File statusFileDir) {
+ File[] statusFiles = exDir.listFiles(new PrefixSuffixFilter("_job.", ".status"));
+ for (File file: statusFiles) {
+ try {
+ NodeStatus status = NodeStatus.createNodeFromObject(JSONUtils.parseJSONFromFile(file));
+ String jobId = status.getJobId();
+
+ File jobStatusDir = new File(statusFileDir, jobId);
+ if (!jobStatusDir.exists()) {
+ jobStatusDir.mkdirs();
+ }
+
+ File destFile = new File(jobStatusDir, file.getName());
+ if (destFile.exists()) {
+ destFile.delete();
+ }
+
+ file.renameTo(destFile);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
/**
* Write executable flow file
*
@@ -177,7 +202,7 @@ public class ExecutableFlowLoader {
*
*/
private static class PrefixFilter implements FileFilter {
- private String prefix;
+ private final String prefix;
public PrefixFilter(String prefix) {
this.prefix = prefix;
@@ -190,5 +215,24 @@ public class ExecutableFlowLoader {
return pathname.isFile() && !pathname.isHidden() && name.length() >= prefix.length() && name.startsWith(prefix);
}
}
+
+ private static class PrefixSuffixFilter implements FileFilter {
+ private final String suffix;
+ private final String prefix;
+ private final int presuflength;
+
+ public PrefixSuffixFilter(String prefix, String suffix) {
+ this.suffix = suffix;
+ this.prefix = prefix;
+ presuflength = suffix.length() + prefix.length();
+ }
+
+ @Override
+ public boolean accept(File pathname) {
+ String name = pathname.getName();
+
+ return pathname.isFile() && !pathname.isHidden() && name.length() >= presuflength && name.startsWith(prefix) && name.endsWith(suffix);
+ }
+ }
}
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index f9bb3f6..66679f8 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -15,7 +15,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutableFlow.ExecutableNode;
+import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutableFlow.FailureAction;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutableFlow.Status;
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index d2b0df5..ab84381 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -31,7 +31,10 @@ import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.NodeStatus;
import azkaban.flow.Edge;
import azkaban.flow.Flow;
import azkaban.flow.FlowProps;
@@ -50,6 +53,7 @@ import azkaban.utils.Props;
import azkaban.utils.Utils;
import azkaban.webapp.session.Session;
import azkaban.webapp.servlet.MultipartParser;
+import azkaban.webapp.servlet.HistoryServlet.PageSelection;
public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
private static final long serialVersionUID = 1;
@@ -98,6 +102,9 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
else if (hasParam(req, "prop")) {
handlePropertyPage(req, resp, session);
}
+ else if (hasParam(req, "history")) {
+ handleJobHistoryPage(req, resp, session);
+ }
else if (hasParam(req, "job")) {
handleJobPage(req, resp, session);
}
@@ -553,6 +560,58 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
page.render();
}
+ private void handleJobHistoryPage(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+ Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/jobhistorypage.vm");
+ String projectId = getParam(req, "project");
+ String jobId = getParam(req, "job");
+ int pageNum = getIntParam(req, "page", 1);
+ int pageSize = getIntParam(req, "size", 25);
+
+ page.add("projectId", projectId);
+ page.add("jobid", jobId);
+ page.add("page", pageNum);
+
+ int skipPage = (pageNum - 1)*pageSize;
+
+ ArrayList<NodeStatus> statuses = new ArrayList<NodeStatus>();
+ int numResults = 0;
+ try {
+ numResults = executorManager.getJobHistory(projectId, jobId, pageSize, skipPage, statuses);
+ if (statuses.isEmpty()) {
+ statuses = null;
+ }
+ page.add("history", statuses);
+
+ if (pageNum == 1) {
+ page.add("previous", new PageSelection(1, pageSize, true, false));
+ }
+ page.add("next", new PageSelection(pageNum + 1, pageSize, false, false));
+
+ } catch (ExecutorManagerException e) {
+ page.add("errorMsg", e.getMessage());
+ }
+
+ // Now for the 5 other values.
+ int pageStartValue = 1;
+ if (pageNum > 3) {
+ pageStartValue = pageNum - 2;
+ }
+ int maxPage = (numResults / pageSize) + 1;
+
+ page.add("page1", new PageSelection(pageStartValue, pageSize, pageStartValue > maxPage, pageStartValue == pageNum));
+ pageStartValue++;
+ page.add("page2", new PageSelection(pageStartValue, pageSize, pageStartValue > maxPage, pageStartValue == pageNum));
+ pageStartValue++;
+ page.add("page3", new PageSelection(pageStartValue, pageSize, pageStartValue > maxPage, pageStartValue == pageNum));
+ pageStartValue++;
+ page.add("page4", new PageSelection(pageStartValue, pageSize, pageStartValue > maxPage, pageStartValue == pageNum));
+ pageStartValue++;
+ page.add("page5", new PageSelection(pageStartValue, pageSize, pageStartValue > maxPage, pageStartValue == pageNum));
+ pageStartValue++;
+
+ page.render();
+ }
+
private void handlePermissionPage(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException {
Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/permissionspage.vm");
String projectName = getParam(req, "project");
@@ -988,4 +1047,38 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
return node1.getLevel() - node2.getLevel();
}
}
+
+ public class PageSelection {
+ private int page;
+ private int size;
+ private boolean disabled;
+ private boolean selected;
+
+ public PageSelection(int page, int size, boolean disabled, boolean selected) {
+ this.page = page;
+ this.size = size;
+ this.disabled = disabled;
+ this.setSelected(selected);
+ }
+
+ public int getPage() {
+ return page;
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public boolean getDisabled() {
+ return disabled;
+ }
+
+ public boolean isSelected() {
+ return selected;
+ }
+
+ public void setSelected(boolean selected) {
+ this.selected = selected;
+ }
+ }
}
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
index 5bfa771..7d0aea7 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -68,7 +68,7 @@
<li><div id="pausebtn" class="btn2">Pause</div></li>
<li><div id="resumebtn" class="btn2">Resume</div></li>
<li><div id="cancelbtn" class="btn6">Cancel</div></li>
- <li><div id="executebtn" class="btn1">Execute</div></li>
+ <li><div id="executebtn" class="btn1">Prepare Execution</div></li>
</ul>
</div>
<div id="graphView">
diff --git a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
index 5655c00..edb47c6 100644
--- a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
@@ -54,7 +54,7 @@
<h4><a href="${context}/manager?project=${project.name}">Project <span>$project.name</span></a></h4>
</div>
- <div id="executebtn" class="btn1">Execute</div>
+ <div id="executebtn" class="btn1">Prepare Execution</div>
<div id="scheduleflowbtn" class="btn2 scheduleflow">Schedule Flow</div>
</div>
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobhistorypage.vm b/src/java/azkaban/webapp/servlet/velocity/jobhistorypage.vm
new file mode 100644
index 0000000..059d5ce
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/velocity/jobhistorypage.vm
@@ -0,0 +1,103 @@
+<!DOCTYPE html>
+<html>
+ <head>
+#parse( "azkaban/webapp/servlet/velocity/style.vm" )
+ <script type="text/javascript" src="${context}/js/jquery/jquery.js"></script>
+ <script type="text/javascript" src="${context}/js/jqueryui/jquery-ui.custom.min.js"></script>
+ <script type="text/javascript" src="${context}/js/namespace.js"></script>
+ <script type="text/javascript" src="${context}/js/underscore-1.2.1-min.js"></script>
+ <script type="text/javascript" src="${context}/js/backbone-0.5.3-min.js"></script>
+ <script type="text/javascript" src="${context}/js/jquery.simplemodal.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
+ <script type="text/javascript">
+ var contextURL = "${context}";
+ var currentTime = ${currentTime};
+ var timezone = "${timezone}";
+ var errorMessage = null;
+ var successMessage = null;
+
+ var projectId = "$projectId";
+ var jobName = "$jobid";
+ </script>
+ </head>
+ <body>
+#set($current_page="all")
+#parse( "azkaban/webapp/servlet/velocity/nav.vm" )
+ <div class="messaging"><p id="messageClose">X</p><p id="message"></p></div>
+
+#if($errorMsg)
+ <div class="box-error-message">$errorMsg</div>
+#else
+ <div class="content">
+ #if($error_message != "null")
+ <div class="box-error-message">$error_message</div>
+ #elseif($success_message != "null")
+ <div class="box-success-message">$success_message</div>
+ #end
+ <div id="all-jobs-content">
+ <div class="section-hd">
+ <h2><a href="${context}/manager?project=${projectId}&job=${jobid}&history">Job <span>$jobid</span> History</a></h2>
+ <div class="section-sub-hd">
+ <h4><a href="${context}/manager?project=${projectId}">Project <span>$projectId</span></a></h4>
+ </div>
+
+ </div>
+ </div>
+
+ <div class="executionInfo">
+ <table id="all-jobs" class="all-jobs job-table">
+ <thead>
+ <tr>
+ <th class="execid">Execution Id</th>
+ <th class="jobid">Job</th>
+ <th class="flowid">Flow</th>
+ <th class="date">Start Time</th>
+ <th class="date">End Time</th>
+ <th class="elapse">Elapse</th>
+ <th class="status">Status</th>
+ <th class="logs">Logs</th>
+ </tr>
+ </thead>
+ <tbody>
+ #foreach($job in $history)
+ <tr>
+ <td class="first">
+ <a href="${context}/executor?execid=${job.execId}">$utils.extractNumericalId(${job.execId})</a>
+ </td>
+ <td>
+ <a href="${context}/manager?project=${projectId}&flow=${job.flowId}&job=${jobid}">$jobid</a>
+ </td>
+ <td>
+ <a href="${context}/manager?project=${projectId}&flow=${job.flowId}">${job.flowId}</a>
+ </td>
+ <td>$utils.formatDate(${job.startTime})</td>
+ <td>$utils.formatDate(${job.endTime})</td>
+ <td>$utils.formatDuration(${job.startTime}, ${job.endTime})</td>
+ <td><div class="status ${job.status}">$utils.formatStatus(${job.status})</div></td>
+ <td class="logLink">
+ <a href="${context}/executor?execid=${job.execId}&job=${jobid}">Logs</a>
+ </td>
+ </tr>
+ #end
+ </tbody>
+ </table>
+
+ <div id="pageSelection" class="nonjavascript">
+ <ul>
+ <li id="previous" class="first"><a href="${context}/manager?project=${projectId}&job=${jobid}&history&page=${previous.page}&size=${previous.size}"><span class="arrow">←</span>Previous</a></li>
+
+ <li id="page1" #if($page1.selected) class="selected" #elseif ($page1.disabled) class="disabled" #end><a href="${context}/manager?project=${projectId}&job=${jobid}&history&page=${page1.page}&size=${page1.size}">${page1.page}</a></li>
+ <li id="page2" #if($page2.selected) class="selected" #elseif ($page2.disabled) class="disabled" #end><a href="${context}/manager?project=${projectId}&job=${jobid}&history&page=${page2.page}&size=${page2.size}">${page2.page}</a></li>
+ <li id="page3" #if($page3.selected) class="selected" #elseif ($page3.disabled) class="disabled" #end><a href="${context}/manager?project=${projectId}&job=${jobid}&history&page=${page3.page}&size=${page3.size}">${page3.page}</a></li>
+ <li id="page4" #if($page4.selected) class="selected" #elseif ($page4.disabled) class="disabled" #end><a href="${context}/manager?project=${projectId}&job=${jobid}&history&page=${page4.page}&size=${page4.size}">${page4.page}</a></li>
+ <li id="page5" #if($page5.selected) class="selected" #elseif ($page5.disabled) class="disabled" #end><a href="${context}/manager?project=${projectId}&job=${jobid}&history&page=${page5.page}&size=${page5.size}">${page5.page}</a></li>
+
+ <li id="next"><a href="${context}/manager?project=${projectId}&job=${jobid}&history&page=${next.page}&size=${next.size}">Next<span class="arrow">→</span></a></li>
+ </ul>
+ </div>
+ </div>
+ </div>
+
+#end
+ </body>
+</html>
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobpage.vm b/src/java/azkaban/webapp/servlet/velocity/jobpage.vm
index 8b35553..9f47d4b 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jobpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jobpage.vm
@@ -41,6 +41,8 @@
<h4 class="separator">></h4>
<h4><a href="${context}/manager?project=${project.name}&flow=${flowid}">Flow <span>$flowid</span></a></h4>
</div>
+
+ <a id="jobs-logs-btn" class="btn2" href="${context}/manager?project=${project.name}&job=$jobid&history">Project Logs</a>
</div>
</div>
src/web/js/azkaban.exflow.view.js 4(+2 -2)
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index 207b6c1..aeeb316 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -365,7 +365,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
$(tdTimeline).append(outerProgressBar);
$(tdTimeline).addClass("timeline");
- var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowName + "&job=" + node.id;
+ var requestURL = contextURL + "/manager?project=" + projectName + "&job=" + node.id + "&history";
var a = document.createElement("a");
$(a).attr("href", requestURL);
$(a).text(node.id);
@@ -376,7 +376,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
$(status).attr("id", node.id + "-status-div");
tdStatus.appendChild(status);
- var logURL = contextURL + "/executor?execid=" + execId + "&flow=" + flowName + "&job=" + node.id;
+ var logURL = contextURL + "/executor?execid=" + execId + "&job=" + node.id;
var a = document.createElement("a");
$(a).attr("href", logURL);
$(a).text("Log");