azkaban-aplcache
Changes
.classpath 2(+1 -1)
conf/azkaban.properties 4(+4 -0)
lib/commons-io-2.4.jar 0(+0 -0)
src/java/azkaban/executor/ExecutableFlow.java 151(+120 -31)
src/java/azkaban/executor/ExecutorManager.java 249(+248 -1)
src/java/azkaban/flow/Flow.java 9(+9 -0)
src/java/azkaban/utils/JSONUtils.java 19(+19 -0)
src/web/js/azkaban.layout.js 17(+15 -2)
Details
.classpath 2(+1 -1)
diff --git a/.classpath b/.classpath
index 93e98de..df1b923 100644
--- a/.classpath
+++ b/.classpath
@@ -5,7 +5,6 @@
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="lib/commons-collections-3.2.1.jar"/>
<classpathentry kind="lib" path="lib/commons-fileupload-1.2.1.jar"/>
- <classpathentry kind="lib" path="lib/commons-io-1.4.jar"/>
<classpathentry kind="lib" path="lib/commons-lang-2.6.jar"/>
<classpathentry kind="lib" path="lib/jackson-core-asl-1.9.5.jar"/>
<classpathentry kind="lib" path="lib/jackson-mapper-asl-1.9.5.jar"/>
@@ -21,5 +20,6 @@
<classpathentry kind="lib" path="lib/ehcache-core-2.5.1.jar"/>
<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
<classpathentry kind="lib" path="lib/httpclient-4.2.1.jar"/>
+ <classpathentry kind="lib" path="lib/commons-io-2.4.jar"/>
<classpathentry kind="output" path="dist"/>
</classpath>
conf/azkaban.properties 4(+4 -0)
diff --git a/conf/azkaban.properties b/conf/azkaban.properties
index 5ddba5f..b4b4881 100644
--- a/conf/azkaban.properties
+++ b/conf/azkaban.properties
@@ -15,6 +15,10 @@ project.manager.class=azkaban.project.FileProjectManager
file.project.loader.path=projects
project.global.properties=conf/global.properties
+#Execution directory
+execution.directory=execution
+execution.use.symlink=true
+
# Velocity dev mode
velocity.dev.mode=true
lib/commons-io-2.4.jar 0(+0 -0)
diff --git a/lib/commons-io-2.4.jar b/lib/commons-io-2.4.jar
new file mode 100644
index 0000000..90035a4
Binary files /dev/null and b/lib/commons-io-2.4.jar differ
src/java/azkaban/executor/ExecutableFlow.java 151(+120 -31)
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 2898066..a43833f 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -16,7 +16,8 @@ import azkaban.utils.Props;
public class ExecutableFlow {
private String executionId;
private String flowId;
- private HashMap<String, Props> sourceProps = new HashMap<String, Props>();
+ private String projectId;
+ private String executionPath;
private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
private HashMap<String, ExecutableNode> executableNodes;
private ArrayList<String> startNodes = new ArrayList<String>();
@@ -25,55 +26,47 @@ public class ExecutableFlow {
FAILED, SUCCEEDED, RUNNING, WAITING, IGNORED, READY
}
- private ExecutableFlow() {
+ public ExecutableFlow(String id, Flow flow) {
+ this.executionId = id;
+ this.projectId = flow.getProjectId();
+ this.flowId = flow.getId();
+ this.setFlow(flow);
}
- public void run() {
-
- }
-
- public void setProps(String source, Props props) {
- sourceProps.put(source, props);
- }
-
- public void setStatus(String nodeId, Status status) {
- ExecutableNode exNode = executableNodes.get(nodeId);
- exNode.setStatus(status);
- }
-
- public static ExecutableFlow createExecutableFlow(Flow flow, HashMap<String, Props> sourceProps) {
- ExecutableFlow exflow = new ExecutableFlow();
- exflow.flowId = flow.getId();
-
- // We make a copy so that it's effectively immutable
- exflow.sourceProps = new HashMap<String, Props>();
- exflow.sourceProps.putAll(sourceProps);
-
- HashMap<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+ private void setFlow(Flow flow) {
+ executableNodes = new HashMap<String, ExecutableNode>();
for (Node node: flow.getNodes()) {
String id = node.getId();
ExecutableNode exNode = new ExecutableNode(node);
- nodeMap.put(id, exNode);
+ executableNodes.put(id, exNode);
}
for (Edge edge: flow.getEdges()) {
- ExecutableNode sourceNode = nodeMap.get(edge.getSourceId());
- ExecutableNode targetNode = nodeMap.get(edge.getTargetId());
+ ExecutableNode sourceNode = executableNodes.get(edge.getSourceId());
+ ExecutableNode targetNode = executableNodes.get(edge.getTargetId());
sourceNode.addOutNode(edge.getTargetId());
targetNode.addInNode(edge.getSourceId());
}
- for (ExecutableNode node : nodeMap.values()) {
+ for (ExecutableNode node : executableNodes.values()) {
if (node.getInNodes().size()==0) {
- exflow.startNodes.add(node.id);
+ startNodes.add(node.id);
}
}
- exflow.executableNodes = nodeMap;
- return exflow;
+ flowProps.putAll(flow.getAllFlowProps());
+ }
+
+ public void run() {
+
+ }
+
+ public void setStatus(String nodeId, Status status) {
+ ExecutableNode exNode = executableNodes.get(nodeId);
+ exNode.setStatus(status);
}
public String getExecutionId() {
@@ -84,17 +77,74 @@ public class ExecutableFlow {
this.executionId = executionId;
}
+ public String getFlowId() {
+ return flowId;
+ }
+
+ public void setFlowId(String flowId) {
+ this.flowId = flowId;
+ }
+
+ public String getProjectId() {
+ return projectId;
+ }
+
+ public void setProjectId(String projectId) {
+ this.projectId = projectId;
+ }
+
+ public String getExecutionPath() {
+ return executionPath;
+ }
+
+ public void setExecutionPath(String executionPath) {
+ this.executionPath = executionPath;
+ }
+
+ public Map<String,Object> toObject() {
+ HashMap<String, Object> flowObj = new HashMap<String, Object>();
+ flowObj.put("type", "executableflow");
+ flowObj.put("execution.id", executionId);
+ flowObj.put("execution.path", executionPath);
+ flowObj.put("flow.id", flowId);
+ flowObj.put("project.id", projectId);
+
+ ArrayList<Object> nodes = new ArrayList<Object>();
+ for (ExecutableNode node: executableNodes.values()) {
+ nodes.add(node.toObject());
+ }
+ flowObj.put("nodes", nodes);
+
+ return flowObj;
+ }
+
+ public Set<String> getSources() {
+ HashSet<String> set = new HashSet<String>();
+ for (ExecutableNode exNode: executableNodes.values()) {
+ set.add(exNode.getJobPropsSource());
+ }
+
+ for (FlowProps props: flowProps.values()) {
+ set.add(props.getSource());
+ }
+ return set;
+ }
+
private static class ExecutableNode {
private String id;
+ private String type;
private String jobPropsSource;
private String inheritPropsSource;
private Status status;
+ private long startTime = -1;
+ private long endTime = -1;
private Set<String> inNodes = new HashSet<String>();
private Set<String> outNodes = new HashSet<String>();
private ExecutableNode(Node node) {
id = node.getId();
+ type = node.getType();
jobPropsSource = node.getJobSource();
inheritPropsSource = node.getPropsSource();
status = Status.READY;
@@ -123,5 +173,44 @@ public class ExecutableFlow {
public void setStatus(Status status) {
this.status = status;
}
+
+ public Object toObject() {
+ HashMap<String, Object> objMap = new HashMap<String, Object>();
+ objMap.put("id", id);
+ objMap.put("job.source", jobPropsSource);
+ objMap.put("prop.source", inheritPropsSource);
+ objMap.put("job.type", type);
+ objMap.put("status", status.toString());
+ objMap.put("in.nodes", inNodes);
+ objMap.put("out.nodes", outNodes);
+ objMap.put("start.time", startTime);
+ objMap.put("end.time", endTime);
+
+ return objMap;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(long endTime) {
+ this.endTime = endTime;
+ }
+
+ public String getJobPropsSource() {
+ return jobPropsSource;
+ }
+
+ public String getPropsSource() {
+ return inheritPropsSource;
+ }
}
}
src/java/azkaban/executor/ExecutorManager.java 249(+248 -1)
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index f739ef1..21cf40c 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -1,31 +1,278 @@
package azkaban.executor;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
public class ExecutorManager {
+ private static String FLOW_PATH = "flows";
private static Logger logger = Logger.getLogger(ExecutorManager.class);
+ private File basePath;
+
private AtomicLong counter = new AtomicLong();
private String token;
private HashMap<String, ExecutableFlow> runningFlows = new HashMap<String, ExecutableFlow>();
public ExecutorManager(Props props) {
+ basePath = new File(props.getString("execution.directory"));
+ if (!basePath.exists()) {
+ logger.info("Execution directory " + basePath + " not found.");
+ if (basePath.mkdirs()) {
+ logger.info("Execution directory " + basePath + " created.");
+ }
+ else {
+ throw new RuntimeException("Execution directory " + basePath + " does not exist and cannot be created.");
+ }
+ }
+
token = props.getString("executor.shared.token", "");
counter.set(0);
}
- public void executeFlow(ExecutableFlow flow) {
+ public synchronized ExecutableFlow createExecutableFlow(Flow flow) {
+ String projectId = flow.getProjectId();
+
+ File projectExecutionDir = new File(basePath, projectId);
+ String id = flow.getId();
+
+ // Find execution
+ File executionDir;
+ String executionId;
+ do {
+ executionId = String.valueOf(System.currentTimeMillis()) + "." + id;
+ executionDir = new File(projectExecutionDir, executionId);
+ }
+ while(executionDir.exists());
+
+ ExecutableFlow exFlow = new ExecutableFlow(executionId, flow);
+ return exFlow;
+ }
+
+ public synchronized void setupExecutableFlow(ExecutableFlow exflow) throws ExecutorManagerException {
+ String path = exflow.getExecutionId();
+ String projectFlowDir = exflow.getProjectId() + File.separator + path;
+ File executionPath = new File(basePath, projectFlowDir);
+ if (executionPath.exists()) {
+ throw new ExecutorManagerException("Execution path " + executionPath + " exists. Probably a simultaneous execution.");
+ }
+
+ executionPath.mkdirs();
+ exflow.setExecutionPath(executionPath.getPath());
+ }
+
+ public void executeFlow(ExecutableFlow flow) throws ExecutorManagerException {
+ String executionPath = flow.getExecutionPath();
+ File executionDir = new File(executionPath);
+
+ File resourceFile = writeResourceFile(executionDir, flow);
+ File executableFlowFile = writeExecutableFlowFile(executionDir, flow);
+ }
+
+ public void cleanupAll(ExecutableFlow exflow) throws ExecutorManagerException{
+ String path = exflow.getExecutionPath();
+ File executionPath = new File(path);
+ if (executionPath.exists()) {
+ try {
+ logger.info("Deleting resource path " + executionPath);
+ FileUtils.deleteDirectory(executionPath);
+ } catch (IOException e) {
+ throw new ExecutorManagerException(e.getMessage(), e);
+ }
+ }
+ }
+
+ private File writeResourceFile(File executionDir, ExecutableFlow flow) throws ExecutorManagerException {
+ // Create a source list.
+ Set<String> sourceFiles = flow.getSources();
+
+ // Write out the resource files
+ File resourceFile = new File(executionDir, "_" + flow.getExecutionId() + ".resources");
+ if (resourceFile.exists()) {
+ throw new ExecutorManagerException("The resource file " + resourceFile + " already exists. Race condition?");
+ }
+ HashMap<String, Object> resources = createResourcesList(executionDir, executionDir, sourceFiles);
+ BufferedOutputStream out = null;
+ try {
+ logger.info("Writing resource file " + resourceFile);
+ out = new BufferedOutputStream(new FileOutputStream(resourceFile));
+ JSONUtils.toJSON(resources, out, true);
+ }
+ catch (IOException e) {
+ throw new ExecutorManagerException(e.getMessage(), e);
+ }
+ finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+ return resourceFile;
}
+ private File writeExecutableFlowFile(File executionDir, ExecutableFlow flow) throws ExecutorManagerException {
+ // Write out the execution file
+ String flowFileName = "_" + flow.getExecutionId() + ".flow";
+ File flowFile = new File(executionDir, flowFileName);
+ if (flowFile.exists()) {
+ throw new ExecutorManagerException("The flow file " + flowFileName + " already exists. Race condition?");
+ }
+
+ BufferedOutputStream out = null;
+ try {
+ logger.info("Writing executable file " + flowFile);
+ out = new BufferedOutputStream(new FileOutputStream(flowFile));
+ JSONUtils.toJSON(flow.toObject(), out, true);
+ } catch (IOException e) {
+ throw new ExecutorManagerException(e.getMessage(), e);
+ }
+ finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+
+ return flowFile;
+ }
+
+ private HashMap<String, Object> createResourcesList(File baseDir, File dir, Set<String> sourceFiles) {
+ boolean containsSource = false;
+
+ HashMap<String, Object> directoryMap = new HashMap<String, Object>();
+ String relative = dir.getPath().substring(baseDir.getPath().length(), dir.getPath().length());
+ directoryMap.put("name", dir.getName());
+ directoryMap.put("relative.path", relative);
+ directoryMap.put("type", "directory");
+
+ ArrayList<Object> children = new ArrayList<Object>();
+ for (File file: dir.listFiles()) {
+ if (file.isDirectory()) {
+ HashMap<String, Object> subDir = createResourcesList(baseDir, file, sourceFiles);
+ containsSource |= (Boolean)subDir.get("used.source");
+ children.add(subDir);
+ }
+ else {
+ HashMap<String, Object> subFile = new HashMap<String, Object>();
+ String subFileName = file.getName();
+ String subFilePath = file.getPath().substring(baseDir.getPath().length() + 1, file.getPath().length());
+ boolean source = sourceFiles.contains(subFilePath);
+ containsSource |= source;
+
+ subFile.put("name", subFileName);
+ subFile.put("relative.path", subFilePath);
+ subFile.put("type", "file");
+ subFile.put("used.source", source);
+ subFile.put("size", file.length());
+ subFile.put("modified.date", file.lastModified());
+ children.add(subFile);
+ }
+ }
+
+ directoryMap.put("children", children);
+ directoryMap.put("used.source", containsSource);
+ return directoryMap;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void getDeletableResourceList(HashMap<String, Object> sourceTree, Set<String> deletableResourcePaths) {
+ boolean usedSource = (Boolean)sourceTree.get("used.source");
+
+ if (!usedSource) {
+ String relativePath = (String)sourceTree.get("relative.path");
+ deletableResourcePaths.add(relativePath);
+ }
+ else {
+ List<Object> children = (List<Object>)sourceTree.get("children");
+ if (children != null) {
+ for (Object obj: children) {
+ HashMap<String, Object> child = (HashMap<String,Object>)obj;
+ getDeletableResourceList(child, deletableResourcePaths);
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void cleanupUnusedFiles(ExecutableFlow exflow) throws ExecutorManagerException {
+ String path = exflow.getExecutionPath();
+ File executionPath = new File(path);
+
+ String flowFilename = "_" + exflow.getExecutionId() + ".flow";
+ String resourceFilename = "_" + exflow.getExecutionId() + ".resources";
+
+ File resourceFile = new File(executionPath, resourceFilename);
+ if (!resourceFile.exists()) {
+ throw new ExecutorManagerException("Cleaning failed. Resource file " + flowFilename + " doesn't exist.");
+ }
+
+ HashSet<String> deletableResources = new HashSet<String>();
+ try {
+ HashMap<String, Object> resourceObj = (HashMap<String, Object>)JSONUtils.parseJSONFromFile(resourceFile);
+ getDeletableResourceList(resourceObj, deletableResources);
+ } catch (IOException e) {
+ throw new ExecutorManagerException("Cleaning failed. Resource file " + flowFilename + " parse error.", e);
+ }
+
+ for (String deletable: deletableResources) {
+ File deleteFile = new File(executionPath, deletable);
+ if (deleteFile.exists()) {
+ if (deleteFile.isDirectory()) {
+ logger.info("Deleting directory " + deleteFile);
+ try {
+ FileUtils.deleteDirectory(deleteFile);
+ } catch (IOException e) {
+ logger.error("Failed deleting '" + deleteFile + "'", e);
+ }
+ }
+ else {
+ logger.info("Deleting file " + deleteFile);
+ if(!deleteFile.delete()) {
+ logger.error("Deleting of resource file '" + deleteFile + "' failed.");
+ }
+ }
+ }
+ else {
+ logger.error("Failed deleting '" + deleteFile + "'. File doesn't exist.");
+ }
+ }
+ }
+
private class ExecutingFlow implements Runnable {
public void run() {
}
}
+
+ private void updateRunningJobs() {
+
+ }
+
+ private String createUniqueId(String projectId, String flowId) {
+ return null;
+ }
}
diff --git a/src/java/azkaban/executor/ExecutorManagerException.java b/src/java/azkaban/executor/ExecutorManagerException.java
new file mode 100644
index 0000000..70df2ad
--- /dev/null
+++ b/src/java/azkaban/executor/ExecutorManagerException.java
@@ -0,0 +1,13 @@
+package azkaban.executor;
+
+public class ExecutorManagerException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public ExecutorManagerException(String message) {
+ super(message);
+ }
+
+ public ExecutorManagerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
src/java/azkaban/flow/Flow.java 9(+9 -0)
diff --git a/src/java/azkaban/flow/Flow.java b/src/java/azkaban/flow/Flow.java
index e35eee2..aaf206a 100644
--- a/src/java/azkaban/flow/Flow.java
+++ b/src/java/azkaban/flow/Flow.java
@@ -13,6 +13,7 @@ public class Flow {
READY, RUNNING, RUNNING_WITH_FAILURE, FAILED, SUCCEEDED
}
private final String id;
+ private String projectId;
private ArrayList<Node> startNodes = null;
private ArrayList<Node> endNodes = null;
private int numLevels = -1;
@@ -308,4 +309,12 @@ public class Flow {
public Map<String, FlowProps> getAllFlowProps() {
return flowProps;
}
+
+ public String getProjectId() {
+ return projectId;
+ }
+
+ public void setProjectId(String projectId) {
+ this.projectId = projectId;
+ }
}
\ No newline at end of file
diff --git a/src/java/azkaban/project/FileProjectManager.java b/src/java/azkaban/project/FileProjectManager.java
index 94d5b6b..87bccfc 100644
--- a/src/java/azkaban/project/FileProjectManager.java
+++ b/src/java/azkaban/project/FileProjectManager.java
@@ -4,6 +4,7 @@ import java.io.File;
import java.io.FileFilter;
import java.io.FileWriter;
import java.io.IOException;
+
import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -18,6 +19,7 @@ import net.sf.ehcache.Element;
import net.sf.ehcache.config.CacheConfiguration;
import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
+import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
@@ -82,8 +84,7 @@ public class FileProjectManager implements ProjectManager {
logger.info("Directory creation was successful.");
}
else {
- throw new RuntimeException(
- "FileProjectLoader cannot create directory " + projectDirectory);
+ throw new RuntimeException("FileProjectLoader cannot create directory " + projectDirectory);
}
}
else if (projectDirectory.isFile()) {
@@ -152,6 +153,7 @@ public class FileProjectManager implements ProjectManager {
try {
flow = Flow.flowFromObject(objectizedFlow);
+ flow.setProjectId(project.getName());
}
catch (Exception e) {
logger.error(
@@ -229,6 +231,7 @@ public class FileProjectManager implements ProjectManager {
}
for (Flow flow : flows.values()) {
+ flow.setProjectId(projectName);
try {
if (flow.getErrors() != null) {
errors.addAll(flow.getErrors());
@@ -498,4 +501,27 @@ public class FileProjectManager implements ProjectManager {
return sourceMap;
}
+ @Override
+ public void copyProjectSourceFilesToDirectory(Project project, File directory) throws ProjectManagerException {
+
+ if (!directory.exists()) {
+ throw new ProjectManagerException("Destination directory " + directory + " doesn't exist.");
+ }
+
+ String mySource = project.getName() + File.separatorChar + project.getSource() + File.separatorChar + "src";
+
+ File projectDir = new File(projectDirectory, mySource);
+ if (!projectDir.exists()) {
+ throw new ProjectManagerException("Project source directory " + mySource + " doesn't exist.");
+ }
+
+ logger.info("Copying from project dir " + projectDir + " to " + directory);
+ try {
+ FileUtils.copyDirectory(projectDir, directory);
+ } catch (IOException e) {
+ throw new ProjectManagerException(e.getMessage());
+ }
+ }
+
+
}
\ No newline at end of file
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index 4abf57c..8813316 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -28,4 +28,6 @@ public interface ProjectManager {
public Props getProperties(Project project, String source, User user) throws ProjectManagerException;
public HashMap<String, Props> getAllFlowProperties(Project project, String flowId, User user) throws ProjectManagerException;
+
+ public void copyProjectSourceFilesToDirectory(Project project, File directory) throws ProjectManagerException;
}
\ No newline at end of file
src/java/azkaban/utils/JSONUtils.java 19(+19 -0)
diff --git a/src/java/azkaban/utils/JSONUtils.java b/src/java/azkaban/utils/JSONUtils.java
index 80ebde6..93e0cd2 100644
--- a/src/java/azkaban/utils/JSONUtils.java
+++ b/src/java/azkaban/utils/JSONUtils.java
@@ -2,6 +2,7 @@ package azkaban.utils;
import java.io.File;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -37,6 +38,24 @@ public class JSONUtils {
}
}
+ public static void toJSON(Object obj, OutputStream stream) {
+ toJSON(obj, stream, false);
+ }
+
+ public static void toJSON(Object obj, OutputStream stream, boolean prettyPrint) {
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ if (prettyPrint) {
+ ObjectWriter writer = mapper.writerWithDefaultPrettyPrinter();
+ writer.writeValue(stream, obj);
+ return;
+ }
+ mapper.writeValue(stream, obj);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public static Object parseJSONFromString(String json) throws IOException {
ObjectMapper mapper = new ObjectMapper();
JsonFactory factory = new JsonFactory();
diff --git a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
index 2c643e0..9463f12 100644
--- a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
@@ -13,6 +13,7 @@ import javax.servlet.http.HttpServletResponse;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutableFlow.Status;
+import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
@@ -96,17 +97,62 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
return;
}
- ExecutableFlow exflow = ExecutableFlow.createExecutableFlow(flow, sources);
-
+ // Create ExecutableFlow
+ ExecutableFlow exflow = executorManager.createExecutableFlow(flow);
Map<String, String> paramGroup = this.getParamGroup(req, "disabled");
for (Map.Entry<String, String> entry: paramGroup.entrySet()) {
boolean nodeDisabled = Boolean.parseBoolean(entry.getValue());
exflow.setStatus(entry.getKey(), nodeDisabled ? Status.IGNORED : Status.READY);
}
- executorManager.executeFlow(exflow);
+ // Create directory
+ try {
+ executorManager.setupExecutableFlow(exflow);
+ } catch (ExecutorManagerException e) {
+ try {
+ executorManager.cleanupAll(exflow);
+ } catch (ExecutorManagerException e1) {
+ e1.printStackTrace();
+ }
+ ret.put("error", e.getMessage());
+ return;
+ }
+
+ // Copy files to the source.
+ File executionDir = new File(exflow.getExecutionPath());
+ try {
+ projectManager.copyProjectSourceFilesToDirectory(project, executionDir);
+ } catch (ProjectManagerException e) {
+ try {
+ executorManager.cleanupAll(exflow);
+ } catch (ExecutorManagerException e1) {
+ e1.printStackTrace();
+ }
+ ret.put("error", e.getMessage());
+ return;
+ }
+
+ try {
+ executorManager.executeFlow(exflow);
+ } catch (ExecutorManagerException e) {
+ try {
+ executorManager.cleanupAll(exflow);
+ } catch (ExecutorManagerException e1) {
+ e1.printStackTrace();
+ }
+
+ ret.put("error", e.getMessage());
+ return;
+ }
String execId = exflow.getExecutionId();
- ret.put("execid", "test");
+ // The following is just a test for cleanup
+// try {
+// executorManager.cleanupUnusedFiles(exflow);
+// } catch (ExecutorManagerException e) {
+// e.printStackTrace();
+// }
+//
+ ret.put("execid", execId);
}
}
src/web/js/azkaban.layout.js 17(+15 -2)
diff --git a/src/web/js/azkaban.layout.js b/src/web/js/azkaban.layout.js
index 13dc54b..53e7692 100644
--- a/src/web/js/azkaban.layout.js
+++ b/src/web/js/azkaban.layout.js
@@ -50,7 +50,7 @@ function layoutGraph(nodes, edges) {
var destNode = nodeMap[dest];
var lastNode = srcNode;
- // TODO GUIDE EDGES
+
var guides = [];
for (var j = srcNode.level + 1; j < destNode.level; ++j) {
@@ -79,7 +79,19 @@ function layoutGraph(nodes, edges) {
spreadLayerSmart(layers[i]);
}
-
+
+ // The top level can get out of alignment, so we do this kick back
+ // manouver before we seriously get started sorting.
+ if (maxLayer > 1) {
+ uncrossWithIn(layers[1]);
+ sort(layers[1]);
+ spreadLayerSmart(layers[1]);
+
+ uncrossWithOut(layers[0]);
+ sort(layers[0]);
+ spreadLayerSmart(layers[0]);
+ }
+
// Uncross down
for (var i=1; i <= maxLayer; ++i) {
uncrossWithIn(layers[i]);
@@ -87,6 +99,7 @@ function layoutGraph(nodes, edges) {
spreadLayerSmart(layers[i]);
}
+
// Space it vertically
spaceVertically(layers, maxLayer);