azkaban-aplcache

Updated common io jar. Got copy + delete of execution mostly

8/11/2012 12:20:24 AM

Details

.classpath 2(+1 -1)

diff --git a/.classpath b/.classpath
index 93e98de..df1b923 100644
--- a/.classpath
+++ b/.classpath
@@ -5,7 +5,6 @@
 	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
 	<classpathentry kind="lib" path="lib/commons-collections-3.2.1.jar"/>
 	<classpathentry kind="lib" path="lib/commons-fileupload-1.2.1.jar"/>
-	<classpathentry kind="lib" path="lib/commons-io-1.4.jar"/>
 	<classpathentry kind="lib" path="lib/commons-lang-2.6.jar"/>
 	<classpathentry kind="lib" path="lib/jackson-core-asl-1.9.5.jar"/>
 	<classpathentry kind="lib" path="lib/jackson-mapper-asl-1.9.5.jar"/>
@@ -21,5 +20,6 @@
 	<classpathentry kind="lib" path="lib/ehcache-core-2.5.1.jar"/>
 	<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
 	<classpathentry kind="lib" path="lib/httpclient-4.2.1.jar"/>
+	<classpathentry kind="lib" path="lib/commons-io-2.4.jar"/>
 	<classpathentry kind="output" path="dist"/>
 </classpath>
diff --git a/conf/azkaban.properties b/conf/azkaban.properties
index 5ddba5f..b4b4881 100644
--- a/conf/azkaban.properties
+++ b/conf/azkaban.properties
@@ -15,6 +15,10 @@ project.manager.class=azkaban.project.FileProjectManager
 file.project.loader.path=projects
 project.global.properties=conf/global.properties
 
+#Execution directory
+execution.directory=execution
+execution.use.symlink=true
+
 # Velocity dev mode
 velocity.dev.mode=true
 
diff --git a/lib/commons-io-2.4.jar b/lib/commons-io-2.4.jar
new file mode 100644
index 0000000..90035a4
Binary files /dev/null and b/lib/commons-io-2.4.jar differ
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 2898066..a43833f 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -16,7 +16,8 @@ import azkaban.utils.Props;
 public class ExecutableFlow {
 	private String executionId;
 	private String flowId;
-	private HashMap<String, Props> sourceProps = new HashMap<String, Props>();
+	private String projectId;
+	private String executionPath;
 	private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
 	private HashMap<String, ExecutableNode> executableNodes;
 	private ArrayList<String> startNodes = new ArrayList<String>();
@@ -25,55 +26,47 @@ public class ExecutableFlow {
 		FAILED, SUCCEEDED, RUNNING, WAITING, IGNORED, READY
 	}
 	
-	private ExecutableFlow() {
+	public ExecutableFlow(String id, Flow flow) {
+		this.executionId = id;
+		this.projectId = flow.getProjectId();
+		this.flowId = flow.getId();
 		
+		this.setFlow(flow);
 	}
 	
-	public void run() {
-		
-	}
-	
-	public void setProps(String source, Props props) {
-		sourceProps.put(source, props);
-	}
-	
-	public void setStatus(String nodeId, Status status) {
-		ExecutableNode exNode = executableNodes.get(nodeId);
-		exNode.setStatus(status);
-	}
-	
-	public static ExecutableFlow createExecutableFlow(Flow flow, HashMap<String, Props> sourceProps) {
-		ExecutableFlow exflow = new ExecutableFlow();
-		exflow.flowId = flow.getId();
-		
-		// We make a copy so that it's effectively immutable
-		exflow.sourceProps = new HashMap<String, Props>();
-		exflow.sourceProps.putAll(sourceProps);
-		
-		HashMap<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+	private void setFlow(Flow flow) {
+		executableNodes = new HashMap<String, ExecutableNode>();
 		
 		for (Node node: flow.getNodes()) {
 			String id = node.getId();
 			ExecutableNode exNode = new ExecutableNode(node);
-			nodeMap.put(id, exNode);
+			executableNodes.put(id, exNode);
 		}
 		
 		for (Edge edge: flow.getEdges()) {
-			ExecutableNode sourceNode = nodeMap.get(edge.getSourceId());
-			ExecutableNode targetNode = nodeMap.get(edge.getTargetId());
+			ExecutableNode sourceNode = executableNodes.get(edge.getSourceId());
+			ExecutableNode targetNode = executableNodes.get(edge.getTargetId());
 			
 			sourceNode.addOutNode(edge.getTargetId());
 			targetNode.addInNode(edge.getSourceId());
 		}
 		
-		for (ExecutableNode node : nodeMap.values()) {
+		for (ExecutableNode node : executableNodes.values()) {
 			if (node.getInNodes().size()==0) {
-				exflow.startNodes.add(node.id);
+				startNodes.add(node.id);
 			}
 		}
 		
-		exflow.executableNodes = nodeMap;
-		return exflow;
+		flowProps.putAll(flow.getAllFlowProps());
+	}
+	
+	public void run() {
+		
+	}
+
+	public void setStatus(String nodeId, Status status) {
+		ExecutableNode exNode = executableNodes.get(nodeId);
+		exNode.setStatus(status);
 	}
 	
 	public String getExecutionId() {
@@ -84,17 +77,74 @@ public class ExecutableFlow {
 		this.executionId = executionId;
 	}
 
+	public String getFlowId() {
+		return flowId;
+	}
+
+	public void setFlowId(String flowId) {
+		this.flowId = flowId;
+	}
+
+	public String getProjectId() {
+		return projectId;
+	}
+
+	public void setProjectId(String projectId) {
+		this.projectId = projectId;
+	}
+
+	public String getExecutionPath() {
+		return executionPath;
+	}
+
+	public void setExecutionPath(String executionPath) {
+		this.executionPath = executionPath;
+	}
+
+	public Map<String,Object> toObject() {
+		HashMap<String, Object> flowObj = new HashMap<String, Object>();
+		flowObj.put("type", "executableflow");
+		flowObj.put("execution.id", executionId);
+		flowObj.put("execution.path", executionPath);
+		flowObj.put("flow.id", flowId);
+		flowObj.put("project.id", projectId);
+		
+		ArrayList<Object> nodes = new ArrayList<Object>();
+		for (ExecutableNode node: executableNodes.values()) {
+			nodes.add(node.toObject());
+		}
+		flowObj.put("nodes", nodes);
+
+		return flowObj;
+	}
+
+	public Set<String> getSources() {
+		HashSet<String> set = new HashSet<String>();
+		for (ExecutableNode exNode: executableNodes.values()) {
+			set.add(exNode.getJobPropsSource());
+		}
+		
+		for (FlowProps props: flowProps.values()) {
+			set.add(props.getSource());
+		}
+		return set;
+	}
+	
 	private static class ExecutableNode {
 		private String id;
+		private String type;
 		private String jobPropsSource;
 		private String inheritPropsSource;
 		private Status status;
+		private long startTime = -1;
+		private long endTime = -1;
 		
 		private Set<String> inNodes = new HashSet<String>();
 		private Set<String> outNodes = new HashSet<String>();
 		
 		private ExecutableNode(Node node) {
 			id = node.getId();
+			type = node.getType();
 			jobPropsSource = node.getJobSource();
 			inheritPropsSource = node.getPropsSource();
 			status = Status.READY;
@@ -123,5 +173,44 @@ public class ExecutableFlow {
 		public void setStatus(Status status) {
 			this.status = status;
 		}
+		
+		public Object toObject() {
+			HashMap<String, Object> objMap = new HashMap<String, Object>();
+			objMap.put("id", id);
+			objMap.put("job.source", jobPropsSource);
+			objMap.put("prop.source", inheritPropsSource);
+			objMap.put("job.type", type);
+			objMap.put("status", status.toString());
+			objMap.put("in.nodes", inNodes);
+			objMap.put("out.nodes", outNodes);
+			objMap.put("start.time", startTime);
+			objMap.put("end.time", endTime);
+			
+			return objMap;
+		}
+
+		public long getStartTime() {
+			return startTime;
+		}
+
+		public void setStartTime(long startTime) {
+			this.startTime = startTime;
+		}
+
+		public long getEndTime() {
+			return endTime;
+		}
+
+		public void setEndTime(long endTime) {
+			this.endTime = endTime;
+		}
+		
+		public String getJobPropsSource() {
+			return jobPropsSource;
+		}
+
+		public String getPropsSource() {
+			return inheritPropsSource;
+		}
 	}
 }
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index f739ef1..21cf40c 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -1,31 +1,278 @@
 package azkaban.executor;
 
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
 
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
 
 public class ExecutorManager {
+	private static String FLOW_PATH = "flows";
 	private static Logger logger = Logger.getLogger(ExecutorManager.class);
+	private File basePath;
+	
 	private AtomicLong counter = new AtomicLong();
 	private String token;
 	
 	private HashMap<String, ExecutableFlow> runningFlows = new HashMap<String, ExecutableFlow>();
 	
 	public ExecutorManager(Props props) {
+		basePath = new File(props.getString("execution.directory"));
+		if (!basePath.exists()) {
+			logger.info("Execution directory " + basePath + " not found.");
+			if (basePath.mkdirs()) {
+				logger.info("Execution directory " + basePath + " created.");
+			}
+			else {
+				throw new RuntimeException("Execution directory " + basePath + " does not exist and cannot be created.");
+			}
+		}
+		
 		token = props.getString("executor.shared.token", "");
 		counter.set(0);
 	}
 	
-	public void executeFlow(ExecutableFlow flow) {
+	public synchronized ExecutableFlow createExecutableFlow(Flow flow) {
+		String projectId = flow.getProjectId();
+		
+		File projectExecutionDir = new File(basePath, projectId);
+		String id = flow.getId();
+		
+		// Find execution
+		File executionDir;
+		String executionId;
+		do {
+			executionId = String.valueOf(System.currentTimeMillis()) + "." + id;
+			executionDir = new File(projectExecutionDir, executionId);
+		}
+		while(executionDir.exists());
+		
+		ExecutableFlow exFlow = new ExecutableFlow(executionId, flow);
+		return exFlow;
+	}
+	
+	public synchronized void setupExecutableFlow(ExecutableFlow exflow) throws ExecutorManagerException {
+		String path = exflow.getExecutionId();
+		String projectFlowDir = exflow.getProjectId() + File.separator + path;
+		File executionPath = new File(basePath, projectFlowDir);
+		if (executionPath.exists()) {
+			throw new ExecutorManagerException("Execution path " + executionPath + " exists. Probably a simultaneous execution.");
+		}
+		
+		executionPath.mkdirs();
+		exflow.setExecutionPath(executionPath.getPath());
+	}
+	
+	public void executeFlow(ExecutableFlow flow) throws ExecutorManagerException {
+		String executionPath = flow.getExecutionPath();
+		File executionDir = new File(executionPath);
+
+		File resourceFile = writeResourceFile(executionDir, flow);
+		File executableFlowFile = writeExecutableFlowFile(executionDir, flow);
+	}
+	
+	public void cleanupAll(ExecutableFlow exflow) throws ExecutorManagerException{
+		String path = exflow.getExecutionPath();
+		File executionPath = new File(path);
+		if (executionPath.exists()) {
+			try {
+				logger.info("Deleting resource path " + executionPath);
+				FileUtils.deleteDirectory(executionPath);
+			} catch (IOException e) {
+				throw new ExecutorManagerException(e.getMessage(), e);
+			}
+		}
+	}
+	
+	private File writeResourceFile(File executionDir, ExecutableFlow flow) throws ExecutorManagerException {
+		// Create a source list.
+		Set<String> sourceFiles = flow.getSources();
+		
+		// Write out the resource files
+		File resourceFile = new File(executionDir, "_" + flow.getExecutionId() + ".resources");
+		if (resourceFile.exists()) {
+			throw new ExecutorManagerException("The resource file " + resourceFile + " already exists. Race condition?");
+		}
+		HashMap<String, Object> resources = createResourcesList(executionDir, executionDir, sourceFiles);
+		BufferedOutputStream out = null;
+		try {
+			logger.info("Writing resource file " + resourceFile);
+			out = new BufferedOutputStream(new FileOutputStream(resourceFile));
+			JSONUtils.toJSON(resources, out, true);
+		} 
+		catch (IOException e) {
+			throw new ExecutorManagerException(e.getMessage(), e);
+		}
+		finally {
+			if (out != null) {
+				try {
+					out.close();
+				} catch (IOException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+			}
+		}
 		
+		return resourceFile;
 	}
 	
+	private File writeExecutableFlowFile(File executionDir, ExecutableFlow flow) throws ExecutorManagerException {
+		// Write out the execution file
+		String flowFileName = "_" + flow.getExecutionId() + ".flow";
+		File flowFile = new File(executionDir, flowFileName);
+		if (flowFile.exists()) {
+			throw new ExecutorManagerException("The flow file " + flowFileName + " already exists. Race condition?");
+		}
+
+		BufferedOutputStream out = null;
+		try {
+			logger.info("Writing executable file " + flowFile);
+			out = new BufferedOutputStream(new FileOutputStream(flowFile));
+			JSONUtils.toJSON(flow.toObject(), out, true);
+		} catch (IOException e) {
+			throw new ExecutorManagerException(e.getMessage(), e);
+		}
+		finally {
+			if (out != null) {
+				try {
+					out.close();
+				} catch (IOException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+			}
+		}
+		
+		return flowFile;
+	}
+	
+	private HashMap<String, Object> createResourcesList(File baseDir, File dir, Set<String> sourceFiles) {
+		boolean containsSource = false;
+		
+		HashMap<String, Object> directoryMap = new HashMap<String, Object>();
+		String relative = dir.getPath().substring(baseDir.getPath().length(), dir.getPath().length());
+		directoryMap.put("name", dir.getName());
+		directoryMap.put("relative.path", relative);
+		directoryMap.put("type", "directory");
+		
+		ArrayList<Object> children = new ArrayList<Object>();
+		for (File file: dir.listFiles()) {
+			if (file.isDirectory()) {
+				HashMap<String, Object> subDir = createResourcesList(baseDir, file, sourceFiles);
+				containsSource |= (Boolean)subDir.get("used.source");
+				children.add(subDir);
+			}
+			else {
+				HashMap<String, Object> subFile = new HashMap<String, Object>();
+				String subFileName = file.getName();
+				String subFilePath = file.getPath().substring(baseDir.getPath().length() + 1, file.getPath().length());
+				boolean source =  sourceFiles.contains(subFilePath);
+				containsSource |= source;
+				
+				subFile.put("name", subFileName);
+				subFile.put("relative.path", subFilePath);
+				subFile.put("type", "file");
+				subFile.put("used.source", source);
+				subFile.put("size", file.length());
+				subFile.put("modified.date", file.lastModified());
+				children.add(subFile);
+			}
+		}
+		
+		directoryMap.put("children", children);
+		directoryMap.put("used.source", containsSource);
+		return directoryMap;
+	}
+	
+	@SuppressWarnings("unchecked")
+	private void getDeletableResourceList(HashMap<String, Object> sourceTree, Set<String> deletableResourcePaths) {
+		boolean usedSource = (Boolean)sourceTree.get("used.source");
+
+		if (!usedSource) {
+			String relativePath = (String)sourceTree.get("relative.path");
+			deletableResourcePaths.add(relativePath);
+		}
+		else {
+			List<Object> children = (List<Object>)sourceTree.get("children");
+			if (children != null) {
+				for (Object obj: children) {
+					HashMap<String, Object> child = (HashMap<String,Object>)obj;
+					getDeletableResourceList(child, deletableResourcePaths);
+				}
+			}
+		}
+	}
+	
+	@SuppressWarnings("unchecked")
+	public void cleanupUnusedFiles(ExecutableFlow exflow) throws ExecutorManagerException {
+		String path = exflow.getExecutionPath();
+		File executionPath = new File(path);
+		
+		String flowFilename = "_" + exflow.getExecutionId() + ".flow";
+		String resourceFilename = "_" + exflow.getExecutionId() + ".resources";
+
+		File resourceFile = new File(executionPath, resourceFilename);
+		if (!resourceFile.exists()) {
+			throw new ExecutorManagerException("Cleaning failed. Resource file " + flowFilename + " doesn't exist.");
+		}
+		
+		HashSet<String> deletableResources = new HashSet<String>();
+		try {
+			HashMap<String, Object> resourceObj = (HashMap<String, Object>)JSONUtils.parseJSONFromFile(resourceFile);
+			getDeletableResourceList(resourceObj, deletableResources);
+		} catch (IOException e) {
+			throw new ExecutorManagerException("Cleaning failed. Resource file " + flowFilename + " parse error.", e);
+		}
+		
+		for (String deletable: deletableResources) {
+			File deleteFile = new File(executionPath, deletable);
+			if (deleteFile.exists()) {
+				if (deleteFile.isDirectory()) {
+					logger.info("Deleting directory " + deleteFile);
+					try {
+						FileUtils.deleteDirectory(deleteFile);
+					} catch (IOException e) {
+						logger.error("Failed deleting '" + deleteFile + "'", e);
+					}
+				}
+				else {
+					logger.info("Deleting file " + deleteFile);
+					if(!deleteFile.delete()) {
+						logger.error("Deleting of resource file '" + deleteFile + "' failed.");
+					}
+				}
+			}
+			else {
+				logger.error("Failed deleting '" + deleteFile + "'. File doesn't exist.");
+			}
+		}
+	}
+
 	private class ExecutingFlow implements Runnable {
 		public void run() {
 			
 		}
 	}
+	
+	private void updateRunningJobs() {
+		
+	}
+	
+	private String createUniqueId(String projectId, String flowId) {
+		return null;
+	}
 }
diff --git a/src/java/azkaban/executor/ExecutorManagerException.java b/src/java/azkaban/executor/ExecutorManagerException.java
new file mode 100644
index 0000000..70df2ad
--- /dev/null
+++ b/src/java/azkaban/executor/ExecutorManagerException.java
@@ -0,0 +1,13 @@
+package azkaban.executor;
+
+public class ExecutorManagerException extends Exception {
+	private static final long serialVersionUID = 1L;
+
+	public ExecutorManagerException(String message) {
+		super(message);
+	}
+	
+	public ExecutorManagerException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}
diff --git a/src/java/azkaban/flow/Flow.java b/src/java/azkaban/flow/Flow.java
index e35eee2..aaf206a 100644
--- a/src/java/azkaban/flow/Flow.java
+++ b/src/java/azkaban/flow/Flow.java
@@ -13,6 +13,7 @@ public class Flow {
 		READY, RUNNING, RUNNING_WITH_FAILURE, FAILED, SUCCEEDED
 	}
 	private final String id;
+	private String projectId;
 	private ArrayList<Node> startNodes = null;
 	private ArrayList<Node> endNodes = null;
 	private int numLevels = -1;
@@ -308,4 +309,12 @@ public class Flow {
 	public Map<String, FlowProps> getAllFlowProps() {
 	    return flowProps;
 	}
+
+	public String getProjectId() {
+		return projectId;
+	}
+	
+	public void setProjectId(String projectId) {
+		this.projectId = projectId;
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/project/FileProjectManager.java b/src/java/azkaban/project/FileProjectManager.java
index 94d5b6b..87bccfc 100644
--- a/src/java/azkaban/project/FileProjectManager.java
+++ b/src/java/azkaban/project/FileProjectManager.java
@@ -4,6 +4,7 @@ import java.io.File;
 import java.io.FileFilter;
 import java.io.FileWriter;
 import java.io.IOException;
+
 import java.security.AccessControlException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -18,6 +19,7 @@ import net.sf.ehcache.Element;
 import net.sf.ehcache.config.CacheConfiguration;
 import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
@@ -82,8 +84,7 @@ public class FileProjectManager implements ProjectManager {
 				logger.info("Directory creation was successful.");
 			} 
 			else {
-				throw new RuntimeException(
-						"FileProjectLoader cannot create directory " + projectDirectory);
+				throw new RuntimeException("FileProjectLoader cannot create directory " + projectDirectory);
 			}
 		} 
 		else if (projectDirectory.isFile()) {
@@ -152,6 +153,7 @@ public class FileProjectManager implements ProjectManager {
 
 							try {
 								flow = Flow.flowFromObject(objectizedFlow);
+								flow.setProjectId(project.getName());
 							} 
 							catch (Exception e) {
 								logger.error(
@@ -229,6 +231,7 @@ public class FileProjectManager implements ProjectManager {
 		}
 
 		for (Flow flow : flows.values()) {
+			flow.setProjectId(projectName);
 			try {
 				if (flow.getErrors() != null) {
 					errors.addAll(flow.getErrors());
@@ -498,4 +501,27 @@ public class FileProjectManager implements ProjectManager {
 		return sourceMap;
 	}
 
+	@Override
+	public void copyProjectSourceFilesToDirectory(Project project, File directory) throws ProjectManagerException {
+		
+		if (!directory.exists()) {
+			throw new ProjectManagerException("Destination directory " + directory + " doesn't exist.");
+		}
+		
+		String mySource = project.getName() + File.separatorChar + project.getSource() + File.separatorChar + "src";
+		
+		File projectDir = new File(projectDirectory, mySource);
+		if (!projectDir.exists()) {
+			throw new ProjectManagerException("Project source directory " + mySource + " doesn't exist.");
+		}
+
+		logger.info("Copying from project dir " + projectDir + " to " + directory);
+		try {
+			FileUtils.copyDirectory(projectDir, directory);
+		} catch (IOException e) {
+			throw new ProjectManagerException(e.getMessage());
+		}
+	}
+
+
 }
\ No newline at end of file
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index 4abf57c..8813316 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -28,4 +28,6 @@ public interface ProjectManager {
 	public Props getProperties(Project project, String source, User user) throws ProjectManagerException;
 
 	public HashMap<String, Props> getAllFlowProperties(Project project, String flowId, User user) throws ProjectManagerException;
+	
+	public void copyProjectSourceFilesToDirectory(Project project, File directory) throws ProjectManagerException;
 }
\ No newline at end of file
diff --git a/src/java/azkaban/utils/JSONUtils.java b/src/java/azkaban/utils/JSONUtils.java
index 80ebde6..93e0cd2 100644
--- a/src/java/azkaban/utils/JSONUtils.java
+++ b/src/java/azkaban/utils/JSONUtils.java
@@ -2,6 +2,7 @@ package azkaban.utils;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -37,6 +38,24 @@ public class JSONUtils {
 		}
 	}
 
+	public static void toJSON(Object obj, OutputStream stream) {
+		toJSON(obj, stream, false);
+	}
+	
+	public static void toJSON(Object obj, OutputStream stream, boolean prettyPrint) {
+		ObjectMapper mapper = new ObjectMapper();
+		try {
+			if (prettyPrint) {
+				ObjectWriter writer = mapper.writerWithDefaultPrettyPrinter();
+				writer.writeValue(stream, obj);
+				return;
+			}
+			mapper.writeValue(stream, obj);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+	}
+	
 	public static Object parseJSONFromString(String json) throws IOException {
 		ObjectMapper mapper = new ObjectMapper();
 		JsonFactory factory = new JsonFactory();
diff --git a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
index 2c643e0..9463f12 100644
--- a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
@@ -13,6 +13,7 @@ import javax.servlet.http.HttpServletResponse;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutableFlow.Status;
+import azkaban.executor.ExecutorManagerException;
 import azkaban.flow.Flow;
 import azkaban.project.Project;
 import azkaban.project.ProjectManager;
@@ -96,17 +97,62 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
 			return;
 		}
 		
-		ExecutableFlow exflow = ExecutableFlow.createExecutableFlow(flow, sources);
-		
+		// Create ExecutableFlow
+		ExecutableFlow exflow = executorManager.createExecutableFlow(flow);
 		Map<String, String> paramGroup = this.getParamGroup(req, "disabled");
 		for (Map.Entry<String, String> entry: paramGroup.entrySet()) {
 			boolean nodeDisabled = Boolean.parseBoolean(entry.getValue());
 			exflow.setStatus(entry.getKey(), nodeDisabled ? Status.IGNORED : Status.READY);
 		}
 		
-		executorManager.executeFlow(exflow);
+		// Create directory
+		try {
+			executorManager.setupExecutableFlow(exflow);
+		} catch (ExecutorManagerException e) {
+			try {
+				executorManager.cleanupAll(exflow);
+			} catch (ExecutorManagerException e1) {
+				e1.printStackTrace();
+			}
+			ret.put("error", e.getMessage());
+			return;
+		}
+
+		// Copy files to the source.
+		File executionDir = new File(exflow.getExecutionPath());
+		try {
+			projectManager.copyProjectSourceFilesToDirectory(project, executionDir);
+		} catch (ProjectManagerException e) {
+			try {
+				executorManager.cleanupAll(exflow);
+			} catch (ExecutorManagerException e1) {
+				e1.printStackTrace();
+			}
+			ret.put("error", e.getMessage());
+			return;
+		}
+		
+		try {
+			executorManager.executeFlow(exflow);
+		} catch (ExecutorManagerException e) {
+			try {
+				executorManager.cleanupAll(exflow);
+			} catch (ExecutorManagerException e1) {
+				e1.printStackTrace();
+			}
+			
+			ret.put("error", e.getMessage());
+			return;
+		}
 		String execId = exflow.getExecutionId();
 		
-		ret.put("execid", "test");
+		// The following is just a test for cleanup
+//		try {
+//			executorManager.cleanupUnusedFiles(exflow);
+//		} catch (ExecutorManagerException e) {
+//			e.printStackTrace();
+//		}
+//		
+		ret.put("execid", execId);
 	}
 }
diff --git a/src/web/js/azkaban.layout.js b/src/web/js/azkaban.layout.js
index 13dc54b..53e7692 100644
--- a/src/web/js/azkaban.layout.js
+++ b/src/web/js/azkaban.layout.js
@@ -50,7 +50,7 @@ function layoutGraph(nodes, edges) {
 		var destNode = nodeMap[dest];
 		
 		var lastNode = srcNode;
-		// TODO GUIDE EDGES
+
 		var guides = [];
 		
 		for (var j = srcNode.level + 1; j < destNode.level; ++j) {
@@ -79,7 +79,19 @@ function layoutGraph(nodes, edges) {
 		
 		spreadLayerSmart(layers[i]);
 	}
-	
+
+	// The top level can get out of alignment, so we do this kick back
+	// manouver before we seriously get started sorting.
+	if (maxLayer > 1) {
+		uncrossWithIn(layers[1]);
+		sort(layers[1]);
+		spreadLayerSmart(layers[1]);
+
+		uncrossWithOut(layers[0]);
+		sort(layers[0]);
+		spreadLayerSmart(layers[0]);
+	}
+
 	// Uncross down
 	for (var i=1; i <= maxLayer; ++i) {
 		uncrossWithIn(layers[i]);
@@ -87,6 +99,7 @@ function layoutGraph(nodes, edges) {
 		spreadLayerSmart(layers[i]);
 	}
 	
+
 	// Space it vertically
 	spaceVertically(layers, maxLayer);