azkaban-aplcache

Details

diff --git a/src/java/azkaban/executor/AzkabanExecutorServer.java b/src/java/azkaban/executor/AzkabanExecutorServer.java
index f781e97..902d6c8 100644
--- a/src/java/azkaban/executor/AzkabanExecutorServer.java
+++ b/src/java/azkaban/executor/AzkabanExecutorServer.java
@@ -19,7 +19,6 @@ package azkaban.executor;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.TimeZone;
 
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 580d95d..2898066 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -1,20 +1,127 @@
 package azkaban.executor;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
+import azkaban.flow.Edge;
 import azkaban.flow.Flow;
+import azkaban.flow.FlowProps;
+import azkaban.flow.Node;
 import azkaban.utils.Props;
 
 public class ExecutableFlow {
-	private Flow flow;
+	private String executionId;
+	private String flowId;
 	private HashMap<String, Props> sourceProps = new HashMap<String, Props>();
+	private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
+	private HashMap<String, ExecutableNode> executableNodes;
+	private ArrayList<String> startNodes = new ArrayList<String>();
 	
-	public ExecutableFlow(Flow flow, HashMap<String,Props> sourceProps) {
-		this.flow = flow;
-		this.sourceProps = sourceProps;
+	public enum Status {
+		FAILED, SUCCEEDED, RUNNING, WAITING, IGNORED, READY
+	}
+	
+	private ExecutableFlow() {
+		
 	}
 	
 	public void run() {
 		
 	}
+	
+	public void setProps(String source, Props props) {
+		sourceProps.put(source, props);
+	}
+	
+	public void setStatus(String nodeId, Status status) {
+		ExecutableNode exNode = executableNodes.get(nodeId);
+		exNode.setStatus(status);
+	}
+	
+	public static ExecutableFlow createExecutableFlow(Flow flow, HashMap<String, Props> sourceProps) {
+		ExecutableFlow exflow = new ExecutableFlow();
+		exflow.flowId = flow.getId();
+		
+		// We make a copy so that it's effectively immutable
+		exflow.sourceProps = new HashMap<String, Props>();
+		exflow.sourceProps.putAll(sourceProps);
+		
+		HashMap<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+		
+		for (Node node: flow.getNodes()) {
+			String id = node.getId();
+			ExecutableNode exNode = new ExecutableNode(node);
+			nodeMap.put(id, exNode);
+		}
+		
+		for (Edge edge: flow.getEdges()) {
+			ExecutableNode sourceNode = nodeMap.get(edge.getSourceId());
+			ExecutableNode targetNode = nodeMap.get(edge.getTargetId());
+			
+			sourceNode.addOutNode(edge.getTargetId());
+			targetNode.addInNode(edge.getSourceId());
+		}
+		
+		for (ExecutableNode node : nodeMap.values()) {
+			if (node.getInNodes().size()==0) {
+				exflow.startNodes.add(node.id);
+			}
+		}
+		
+		exflow.executableNodes = nodeMap;
+		return exflow;
+	}
+	
+	public String getExecutionId() {
+		return executionId;
+	}
+
+	public void setExecutionId(String executionId) {
+		this.executionId = executionId;
+	}
+
+	private static class ExecutableNode {
+		private String id;
+		private String jobPropsSource;
+		private String inheritPropsSource;
+		private Status status;
+		
+		private Set<String> inNodes = new HashSet<String>();
+		private Set<String> outNodes = new HashSet<String>();
+		
+		private ExecutableNode(Node node) {
+			id = node.getId();
+			jobPropsSource = node.getJobSource();
+			inheritPropsSource = node.getPropsSource();
+			status = Status.READY;
+		}
+		
+		public void addInNode(String exNode) {
+			inNodes.add(exNode);
+		}
+		
+		public void addOutNode(String exNode) {
+			outNodes.add(exNode);
+		}
+		
+		public Set<String> getOutNodes() {
+			return outNodes;
+		}
+		
+		public Set<String> getInNodes() {
+			return inNodes;
+		}
+		
+		public Status getStatus() {
+			return status;
+		}
+		
+		public void setStatus(Status status) {
+			this.status = status;
+		}
+	}
 }
diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
new file mode 100644
index 0000000..5c6476a
--- /dev/null
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -0,0 +1,9 @@
+package azkaban.executor;
+
+public interface ExecutorLoader {
+	public String getUniqueExecutionId();
+	
+	public void commitExecutableFlow(ExecutableFlow exflow);
+	
+	public ExecutableFlow loadExecutableFlow(String executionId);
+}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index a099c60..f739ef1 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -1,19 +1,29 @@
 package azkaban.executor;
 
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.log4j.Logger;
+
 import azkaban.utils.Props;
 
 public class ExecutorManager {
+	private static Logger logger = Logger.getLogger(ExecutorManager.class);
+	private AtomicLong counter = new AtomicLong();
 	private String token;
 	
+	private HashMap<String, ExecutableFlow> runningFlows = new HashMap<String, ExecutableFlow>();
+	
 	public ExecutorManager(Props props) {
 		token = props.getString("executor.shared.token", "");
+		counter.set(0);
 	}
 	
-	public void executeJob() {
+	public void executeFlow(ExecutableFlow flow) {
 		
 	}
 	
-	private class ExecutorThread extends Thread {
+	private class ExecutingFlow implements Runnable {
 		public void run() {
 			
 		}
diff --git a/src/java/azkaban/executor/ExecutorServlet.java b/src/java/azkaban/executor/ExecutorServlet.java
index 3d57999..dbc5c59 100644
--- a/src/java/azkaban/executor/ExecutorServlet.java
+++ b/src/java/azkaban/executor/ExecutorServlet.java
@@ -9,6 +9,10 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.log4j.Logger;
 
 public class ExecutorServlet extends HttpServlet {
+	public enum State {
+		FAILED, SUCCEEDED, RUNNING, WAITING, IGNORED, READY
+	}
+	
 	private static final Logger logger = Logger.getLogger(ExecutorServlet.class.getName());
 	private String sharedToken;
 	
diff --git a/src/java/azkaban/executor/FileExecutorLoader.java b/src/java/azkaban/executor/FileExecutorLoader.java
new file mode 100644
index 0000000..b93980c
--- /dev/null
+++ b/src/java/azkaban/executor/FileExecutorLoader.java
@@ -0,0 +1,18 @@
+package azkaban.executor;
+
+public class FileExecutorLoader implements ExecutorLoader {
+	@Override
+	public String getUniqueExecutionId() {
+		return "";
+	}
+	
+	@Override
+	public void commitExecutableFlow(ExecutableFlow flow) {
+		
+	}
+	
+	@Override
+	public ExecutableFlow loadExecutableFlow(String executionId) {
+		return null;
+	}
+}
diff --git a/src/java/azkaban/flow/Flow.java b/src/java/azkaban/flow/Flow.java
index 86cd159..e35eee2 100644
--- a/src/java/azkaban/flow/Flow.java
+++ b/src/java/azkaban/flow/Flow.java
@@ -304,4 +304,8 @@ public class Flow {
 	public FlowProps getFlowProps(String propSource) {
 		return flowProps.get(propSource);
 	}
+	
+	public Map<String, FlowProps> getAllFlowProps() {
+	    return flowProps;
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/flow/Node.java b/src/java/azkaban/flow/Node.java
index 4775757..2192468 100644
--- a/src/java/azkaban/flow/Node.java
+++ b/src/java/azkaban/flow/Node.java
@@ -7,14 +7,10 @@ import java.util.Map;
 import azkaban.utils.Utils;
 
 public class Node {
-	public enum State {
-		FAILED, SUCCEEDED, RUNNING, WAITING, IGNORED
-	}
 
 	private final String id;
 	private String jobSource;
 	private String propsSource;
-	private State state = State.WAITING;
 
 	private Point2D position = null;
 	private int level;
@@ -33,16 +29,11 @@ public class Node {
 		this.id = clone.id;
 		this.propsSource = clone.propsSource;
 		this.jobSource = clone.jobSource;
-		this.state = clone.state;
 	}
 
 	public String getId() {
 		return id;
 	}
-	
-	public State getState() {
-		return state;
-	}
 
 	public String getType() {
 		return type;
@@ -51,10 +42,6 @@ public class Node {
 	public void setType(String type) {
 		this.type = type;
 	}
-	
-	public void setState(State state) {
-		this.state = state;
-	}
 
 	public Point2D getPosition() {
 		return position;
@@ -118,14 +105,6 @@ public class Node {
 		if (expectedRuntime != null) {
 			node.setExpectedRuntimeSec(expectedRuntime);
 		}
-
-		String stateStr = (String)mapObj.get("status");
-		if (stateStr != null) {
-			State state = State.valueOf(stateStr);
-			if (state != null) {
-				node.setState(state);
-			}
-		}
 		
 		Map<String,Object> layoutInfo = (Map<String,Object>)mapObj.get("layout");
 		if (layoutInfo != null) {
@@ -160,7 +139,6 @@ public class Node {
 		objMap.put("prop.source", propsSource);
 		objMap.put("job.type", type);
 		objMap.put("expectedRuntime", expectedRunTimeSec);
-		objMap.put("state", state.toString());
 
 		HashMap<String, Object> layoutInfo = new HashMap<String, Object>();
 		if (position != null) {
diff --git a/src/java/azkaban/project/FileProjectManager.java b/src/java/azkaban/project/FileProjectManager.java
index 6bb7074..94d5b6b 100644
--- a/src/java/azkaban/project/FileProjectManager.java
+++ b/src/java/azkaban/project/FileProjectManager.java
@@ -6,6 +6,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.security.AccessControlException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -22,6 +23,8 @@ import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
 import azkaban.flow.Flow;
+import azkaban.flow.FlowProps;
+import azkaban.flow.Node;
 import azkaban.user.Permission;
 import azkaban.user.Permission.Type;
 import azkaban.user.User;
@@ -30,9 +33,9 @@ import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
 
 /**
- * A project loader that stores everything on local file system.
- * The following global parameters should be set -
- * file.project.loader.path - The project install path where projects will be loaded installed to.
+ * A project loader that stores everything on local file system. The following
+ * global parameters should be set - file.project.loader.path - The project
+ * install path where projects will be loaded installed to.
  */
 public class FileProjectManager implements ProjectManager {
 	public static final String DIRECTORY_PARAM = "file.project.loader.path";
@@ -42,10 +45,11 @@ public class FileProjectManager implements ProjectManager {
 	private static final String FLOW_EXTENSION = ".flow";
 	private static final Logger logger = Logger.getLogger(FileProjectManager.class);
 	private static final int IDLE_SECONDS = 120;
+	
 	private ConcurrentHashMap<String, Project> projects = new ConcurrentHashMap<String, Project>();
 	private CacheManager manager = CacheManager.create();
 	private Cache sourceCache;
-	
+
 	private File projectDirectory;
 
 	public FileProjectManager(Props props) {
@@ -55,32 +59,33 @@ public class FileProjectManager implements ProjectManager {
 	}
 
 	private void setupCache() {
-		CacheConfiguration cacheConfig = 
-				new CacheConfiguration("propsCache", 2000)
-					.memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU)
-					.overflowToDisk(false)
-					.eternal(false)
-					.timeToIdleSeconds(IDLE_SECONDS)
-					.diskPersistent(false)
-					.diskExpiryThreadIntervalSeconds(0);
+		CacheConfiguration cacheConfig = new CacheConfiguration("propsCache",2000)
+				.memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU)
+				.overflowToDisk(false)
+				.eternal(false)
+				.timeToIdleSeconds(IDLE_SECONDS)
+				.diskPersistent(false)
+				.diskExpiryThreadIntervalSeconds(0);
 
 		sourceCache = new Cache(cacheConfig);
 		manager.addCache(sourceCache);
 	}
-	
+
 	private void setupDirectories(Props props) {
 		String projectDir = props.getString(DIRECTORY_PARAM);
 		logger.info("Using directory " + projectDir + " as the project directory.");
 		projectDirectory = new File(projectDir);
+		
 		if (!projectDirectory.exists()) {
 			logger.info("Directory " + projectDir + " doesn't exist. Creating.");
 			if (projectDirectory.mkdirs()) {
 				logger.info("Directory creation was successful.");
-			}
+			} 
 			else {
-				throw new RuntimeException("FileProjectLoader cannot create directory " + projectDirectory);
+				throw new RuntimeException(
+						"FileProjectLoader cannot create directory " + projectDirectory);
 			}
-		}
+		} 
 		else if (projectDirectory.isFile()) {
 			throw new RuntimeException("FileProjectManager directory " + projectDirectory + " is really a file.");
 		}
@@ -88,67 +93,77 @@ public class FileProjectManager implements ProjectManager {
 
 	private void loadAllProjects() {
 		File[] directories = projectDirectory.listFiles();
-		
-		for (File dir: directories) {
+
+		for (File dir : directories) {
 			if (!dir.isDirectory()) {
-				logger.error("ERROR loading project from " + dir.getPath() + ". Not a directory." );
-			}
+				logger.error("ERROR loading project from " + dir.getPath() + ". Not a directory.");
+			} 
 			else {
 				File propertiesFile = new File(dir, PROPERTIES_FILENAME);
 				if (!propertiesFile.exists()) {
-					logger.error("ERROR loading project from " + dir.getPath() + ". Project file " + PROPERTIES_FILENAME + " not found." );
-				}
+					logger.error("ERROR loading project from " + dir.getPath()
+							+ ". Project file " + PROPERTIES_FILENAME
+							+ " not found.");
+				} 
 				else {
 					Object obj = null;
 					try {
 						obj = JSONUtils.parseJSONFromFile(propertiesFile);
-					} catch (IOException e) {
-						logger.error("ERROR loading project from " + dir.getPath() + ". Project file " + PROPERTIES_FILENAME + " couldn't be read.", e );
+					} 
+					catch (IOException e) {
+						logger.error( "ERROR loading project from " + dir.getPath() 
+								+ ". Project file " + PROPERTIES_FILENAME + " couldn't be read.", e);
 						continue;
 					}
 
 					Project project = Project.projectFromObject(obj);
 					logger.info("Loading project " + project.getName());
 					projects.put(project.getName(), project);
-				
+
 					String source = project.getSource();
 					if (source == null) {
 						logger.info(project.getName() + ": No flows uploaded");
 						continue;
 					}
-					
+
 					File projectDir = new File(dir, source);
 					if (!projectDir.exists()) {
 						logger.error("ERROR project source dir " + projectDir + " doesn't exist.");
-					}
+					} 
 					else if (!projectDir.isDirectory()) {
 						logger.error("ERROR project source dir " + projectDir + " is not a directory.");
-					}
+					} 
 					else {
 						File[] flowFiles = projectDir.listFiles(new SuffixFilter(FLOW_EXTENSION));
+						
 						Map<String, Flow> flowMap = new LinkedHashMap<String, Flow>();
-						for (File flowFile: flowFiles) {
+						for (File flowFile : flowFiles) {
 							Object objectizedFlow = null;
 							try {
 								objectizedFlow = JSONUtils.parseJSONFromFile(flowFile);
-							} catch (IOException e) {
+							} 
+							catch (IOException e) {
 								logger.error("Error parsing flow file " + flowFile.toString(), e);
 								continue;
 							}
 
-							//Recreate Flow
+							// Recreate Flow
 							Flow flow = null;
-							
+
 							try {
 								flow = Flow.flowFromObject(objectizedFlow);
-							}
+							} 
 							catch (Exception e) {
-								logger.error("Error loading flow " + flowFile.getName() + " in project " + project.getName(), e);
+								logger.error(
+										"Error loading flow "
+												+ flowFile.getName()
+												+ " in project "
+												+ project.getName(), e);
 								continue;
 							}
 							logger.debug("Loaded flow " + project.getName() + ": " + flow.getId());
 							flow.initialize();
-							
+
 							flowMap.put(flow.getId(), flow);
 						}
 
@@ -160,14 +175,14 @@ public class FileProjectManager implements ProjectManager {
 			}
 		}
 	}
-	
+
 	public List<String> getProjectNames() {
 		return new ArrayList<String>(projects.keySet());
 	}
-	
+
 	public List<Project> getProjects(User user) {
 		ArrayList<Project> array = new ArrayList<Project>();
-		for(Project project : projects.values()) {
+		for (Project project : projects.values()) {
 			Permission perm = project.getUserPermission(user);
 
 			if (perm != null && (perm.isPermissionSet(Type.ADMIN) || perm.isPermissionSet(Type.READ))) {
@@ -183,9 +198,8 @@ public class FileProjectManager implements ProjectManager {
 			Permission perm = project.getUserPermission(user);
 			if (perm.isPermissionSet(Type.ADMIN) || perm.isPermissionSet(Type.READ)) {
 				return project;
-			}
-			else {
-				throw new AccessControlException("Permission denied. Do not have read access.");
+			} else {
+				throw new AccessControlException( "Permission denied. Do not have read access.");
 			}
 		}
 		return project;
@@ -194,7 +208,7 @@ public class FileProjectManager implements ProjectManager {
 	public void uploadProject(String projectName, File dir, User uploader, boolean force) throws ProjectManagerException {
 		logger.info("Uploading files to " + projectName);
 		Project project = projects.get(projectName);
-		
+
 		if (project == null) {
 			throw new ProjectManagerException("Project not found.");
 		}
@@ -206,14 +220,15 @@ public class FileProjectManager implements ProjectManager {
 		DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
 		loader.loadProjectFlow(dir);
 		Map<String, Flow> flows = loader.getFlowMap();
-				
+
 		File projectPath = new File(projectDirectory, projectName);
 		File installDir = new File(projectPath, FILE_DATE_FORMAT.print(System.currentTimeMillis()));
+		
 		if (!installDir.mkdir()) {
 			throw new ProjectManagerException("Cannot create directory in " + projectDirectory);
 		}
-		
-		for (Flow flow: flows.values()) {
+
+		for (Flow flow : flows.values()) {
 			try {
 				if (flow.getErrors() != null) {
 					errors.addAll(flow.getErrors());
@@ -221,18 +236,21 @@ public class FileProjectManager implements ProjectManager {
 				flow.initialize();
 
 				writeFlowFile(installDir, flow);
-			} catch (IOException e) {
-				throw new ProjectManagerException(
-						"Project directory " + projectName + " cannot be created in " + projectDirectory, e);
+			}
+			catch (IOException e) {
+				throw new ProjectManagerException("Project directory "
+						+ projectName + " cannot be created in "
+						+ projectDirectory, e);
 			}
 		}
-		
+
 		File destDirectory = new File(installDir, PROJECT_DIRECTORY);
 		dir.renameTo(destDirectory);
-		
+
 		// We install only if the project is not forced install or has no errors
 		if (force || errors.isEmpty()) {
-			// We synchronize on project so that we don't collide when uploading.
+			// We synchronize on project so that we don't collide when
+			// uploading.
 			synchronized (project) {
 				logger.info("Uploading files to " + projectName);
 				project.setSource(installDir.getName());
@@ -240,170 +258,185 @@ public class FileProjectManager implements ProjectManager {
 				project.setLastModifiedUser(uploader.getUserId());
 				project.setFlows(flows);
 			}
-			
+
 			try {
 				writeProjectFile(projectPath, project);
-			} catch (IOException e) {
-				throw new ProjectManagerException(
-						"Project directory " + projectName + 
-						" cannot be created in " + projectDirectory, e);
+			} 
+			catch (IOException e) {
+				throw new ProjectManagerException("Project directory "
+						+ projectName + " cannot be created in "
+						+ projectDirectory, e);
 			}
-		}
+		} 
 		else {
 			logger.info("Errors found loading project " + projectName);
 			StringBuffer bufferErrors = new StringBuffer();
-			for(String error : errors) {
+			for (String error : errors) {
 				bufferErrors.append(error);
 				bufferErrors.append("\n");
 			}
-			
+
 			throw new ProjectManagerException(bufferErrors.toString());
 		}
 	}
 
 	@Override
-	public synchronized Project createProject(String projectName, String description, User creator) throws ProjectManagerException {
+	public synchronized Project createProject(String projectName,
+			String description, User creator) throws ProjectManagerException {
 		if (projectName == null || projectName.trim().isEmpty()) {
 			throw new ProjectManagerException("Project name cannot be empty.");
-		}
+		} 
 		else if (description == null || description.trim().isEmpty()) {
 			throw new ProjectManagerException("Description cannot be empty.");
-		}
+		} 
 		else if (creator == null) {
 			throw new ProjectManagerException("Valid creator user must be set.");
-		}
-		else if (!projectName.matches("[a-zA-Z][a-zA-Z_0-9|-]*")){
+		} 
+		else if (!projectName.matches("[a-zA-Z][a-zA-Z_0-9|-]*")) {
 			throw new ProjectManagerException("Project names must start with a letter, followed by any number of letters, digits, '-' or '_'.");
 		}
-	
+
 		if (projects.contains(projectName)) {
 			throw new ProjectManagerException("Project already exists.");
 		}
-		
+
 		File projectPath = new File(projectDirectory, projectName);
 		if (projectPath.exists()) {
 			throw new ProjectManagerException("Project already exists.");
 		}
-		
-		if(!projectPath.mkdirs()) {
-			throw new ProjectManagerException(
-					"Project directory " + projectName + 
-					" cannot be created in " + projectDirectory);
+
+		if (!projectPath.mkdirs()) {
+			throw new ProjectManagerException("Project directory " + projectName + " cannot be created in " + projectDirectory);
 		}
-		
+
 		Permission perm = new Permission(Type.ADMIN);
 		long time = System.currentTimeMillis();
-		
+
 		Project project = new Project(projectName);
 		project.setUserPermission(creator.getUserId(), perm);
 		project.setDescription(description);
 		project.setCreateTimestamp(time);
 		project.setLastModifiedTimestamp(time);
 		project.setLastModifiedUser(creator.getUserId());
-		
+
 		logger.info("Trying to create " + project.getName() + " by user " + creator.getUserId());
 		try {
 			writeProjectFile(projectPath, project);
-		} catch (IOException e) {
-			throw new ProjectManagerException(
-					"Project directory " + projectName + 
-					" cannot be created in " + projectDirectory, e);
+		} 
+		catch (IOException e) {
+			throw new ProjectManagerException("Project directory " + projectName + " cannot be created in " + projectDirectory, e);
 		}
 		projects.put(projectName, project);
+		
 		return project;
 	}
-	
+
 	private synchronized void writeProjectFile(File directory, Project project) throws IOException {
 		Object object = project.toObject();
-		File tmpFile = File.createTempFile("project-",".json", directory);
-		
+		File tmpFile = File.createTempFile("project-", ".json", directory);
+
 		if (tmpFile.exists()) {
 			tmpFile.delete();
 		}
-		
+
 		logger.info("Writing project file " + tmpFile);
 		String output = JSONUtils.toJSON(object, true);
-		
+
 		FileWriter writer = new FileWriter(tmpFile);
 		try {
 			writer.write(output);
-		} catch (IOException e) {
+		} 
+		catch (IOException e) {
 			if (writer != null) {
 				writer.close();
 			}
-			
+
 			throw e;
 		}
 		writer.close();
-		
+
 		File projectFile = new File(directory, PROPERTIES_FILENAME);
 		File swapFile = new File(directory, PROPERTIES_FILENAME + "_old");
-		
+
 		projectFile.renameTo(swapFile);
 		tmpFile.renameTo(projectFile);
 		swapFile.delete();
-	
+
 	}
-	
+
 	private void writeFlowFile(File directory, Flow flow) throws IOException {
 		Object object = flow.toObject();
 		String filename = flow.getId() + FLOW_EXTENSION;
 		File outputFile = new File(directory, filename);
 		File oldOutputFile = new File(directory, filename + ".old");
-		
+
 		if (outputFile.exists()) {
 			outputFile.renameTo(oldOutputFile);
 		}
-		
+
 		logger.info("Writing flow file " + outputFile);
 		String output = JSONUtils.toJSON(object, true);
-		
+
 		FileWriter writer = new FileWriter(outputFile);
 		try {
 			writer.write(output);
-		} catch (IOException e) {
+		} 
+		catch (IOException e) {
 			if (writer != null) {
 				writer.close();
 			}
-			
+
 			throw e;
 		}
 		writer.close();
-		
+
 		if (oldOutputFile.exists()) {
 			oldOutputFile.delete();
 		}
 	}
-	
-	public Props getProperties(String projectName, String source, User user) throws ProjectManagerException {
+
+	@Override
+	public Props getProperties(String projectName, String source, User user)
+			throws ProjectManagerException {
 		Project project = projects.get(projectName);
 		if (project == null) {
 			throw new ProjectManagerException("Project " + project + " cannot be found.");
 		}
+
+		return getProperties(project, source, user);
+	}
+
+	@Override
+	public Props getProperties(Project project, String source, User user)
+			throws ProjectManagerException {
 		if (!project.hasPermission(user, Type.READ)) {
-			throw new AccessControlException("Permission denied. Do not have read access.");
+			throw new AccessControlException(
+					"Permission denied. Do not have read access.");
 		}
 
-		String mySource = projectName + File.separatorChar + project.getSource() + File.separatorChar + "src" + File.separatorChar + source;
+		String mySource = project.getName() + File.separatorChar
+				+ project.getSource() + File.separatorChar + "src"
+				+ File.separatorChar + source;
 		Element sourceElement = sourceCache.get(mySource);
 
 		if (sourceElement != null) {
-			return Props.clone((Props)sourceElement.getObjectValue());
+			return Props.clone((Props) sourceElement.getObjectValue());
 		}
-		
+
 		File file = new File(projectDirectory, mySource);
 		if (!file.exists()) {
 			throw new ProjectManagerException("Source file " + file.getAbsolutePath() + " doesn't exist.");
 		}
 
 		try {
-			Props props = new Props((Props)null, file);
+			Props props = new Props((Props) null, file);
 			return props;
-		} catch (IOException e) {
+		} 
+		catch (IOException e) {
 			throw new ProjectManagerException("Error loading file " + file.getPath(), e);
 		}
 	}
-	
+
 	@Override
 	public synchronized Project removeProject(String projectName, User user) {
 		return null;
@@ -411,15 +444,15 @@ public class FileProjectManager implements ProjectManager {
 
 	private static class SuffixFilter implements FileFilter {
 		private String suffix;
-		
+
 		public SuffixFilter(String suffix) {
 			this.suffix = suffix;
 		}
-	
+
 		@Override
 		public boolean accept(File pathname) {
 			String name = pathname.getName();
-			
+
 			return pathname.isFile() && !pathname.isHidden() && name.length() > suffix.length() && name.endsWith(suffix);
 		}
 	}
@@ -430,15 +463,39 @@ public class FileProjectManager implements ProjectManager {
 		if (project == null) {
 			throw new ProjectManagerException("Project " + projectName + " doesn't exist.");
 		}
-		
+
 		File projectPath = new File(projectDirectory, projectName);
 		try {
 			writeProjectFile(projectPath, project);
-		}
+		} 
 		catch (IOException e) {
 			throw new ProjectManagerException("Error committing project " + projectName, e);
 		}
 	}
 
+	@Override
+	public HashMap<String, Props> getAllFlowProperties(Project project, String flowId, User user) throws ProjectManagerException {
+		Flow flow = project.getFlow(flowId);
+		if (flow == null) {
+			throw new ProjectManagerException("Flow " + flowId + " doesn't exist in " + project.getName());
+		}
+
+		// Resolve all the node probs
+		HashMap<String, Props> sourceMap = new HashMap<String, Props>();
+		for (Node node : flow.getNodes()) {
+			String source = node.getJobSource();
+			Props props = getProperties(project, node.getJobSource(), user);
+			sourceMap.put(source, props);
+		}
+
+		// Resolve all the shared props.
+		for(FlowProps flowProps: flow.getAllFlowProps().values()) {
+			String source = flowProps.getSource();
+			Props props = getProperties(project, source, user);
+			sourceMap.put(source, props);
+		}
+		
+		return sourceMap;
+	}
 
 }
\ No newline at end of file
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index 43f654c..4abf57c 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -1,6 +1,7 @@
 package azkaban.project;
 
 import java.io.File;
+import java.util.HashMap;
 import java.util.List;
 
 import azkaban.user.User;
@@ -23,4 +24,8 @@ public interface ProjectManager {
 	public Project removeProject(String projectName, User user) throws ProjectManagerException;
 
 	public Props getProperties(String projectName, String source, User user) throws ProjectManagerException;
+
+	public Props getProperties(Project project, String source, User user) throws ProjectManagerException;
+
+	public HashMap<String, Props> getAllFlowProperties(Project project, String flowId, User user) throws ProjectManagerException;
 }
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index d56eae7..d14ed46 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -35,6 +35,7 @@ import org.mortbay.jetty.servlet.DefaultServlet;
 import org.mortbay.jetty.servlet.ServletHolder;
 import org.mortbay.thread.QueuedThreadPool;
 
+import azkaban.executor.ExecutorManager;
 import azkaban.project.FileProjectManager;
 import azkaban.project.ProjectManager;
 import azkaban.user.UserManager;
@@ -42,6 +43,7 @@ import azkaban.user.XmlUserManager;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
 import azkaban.webapp.servlet.AzkabanServletContextListener;
+import azkaban.webapp.servlet.FlowExecutorServlet;
 import azkaban.webapp.servlet.IndexServlet;
 import azkaban.webapp.servlet.ProjectManagerServlet;
 import azkaban.webapp.session.SessionCache;
@@ -53,373 +55,385 @@ import joptsimple.OptionSpec;
 /**
  * The Azkaban Jetty server class
  * 
- * Global azkaban properties for setup. All of them are optional unless otherwise marked:
- * azkaban.name - The displayed name of this instance.
- * azkaban.label - Short descriptor of this Azkaban instance.
- * azkaban.color - Theme color
- * azkaban.temp.dir - Temp dir used by Azkaban for various file uses.
- * web.resource.dir - The directory that contains the static web files.
+ * Global azkaban properties for setup. All of them are optional unless
+ * otherwise marked: azkaban.name - The displayed name of this instance.
+ * azkaban.label - Short descriptor of this Azkaban instance. azkaban.color -
+ * Theme color azkaban.temp.dir - Temp dir used by Azkaban for various file
+ * uses. web.resource.dir - The directory that contains the static web files.
  * default.timezone.id - The timezone code. I.E. America/Los Angeles
  * 
- * user.manager.class - The UserManager class used for the user manager. Default is XmlUserManager.
- * project.manager.class - The ProjectManager to load projects
- * project.global.properties - The base properties inherited by all projects and jobs
+ * user.manager.class - The UserManager class used for the user manager. Default
+ * is XmlUserManager. project.manager.class - The ProjectManager to load
+ * projects project.global.properties - The base properties inherited by all
+ * projects and jobs
  * 
- * jetty.maxThreads - # of threads for jetty
- * jetty.ssl.port - The ssl port used for sessionizing.
- * jetty.keystore - Jetty keystore .
- * jetty.keypassword - Jetty keystore password
- * jetty.truststore - Jetty truststore
- * jetty.trustpassword - Jetty truststore password
+ * jetty.maxThreads - # of threads for jetty jetty.ssl.port - The ssl port used
+ * for sessionizing. jetty.keystore - Jetty keystore . jetty.keypassword - Jetty
+ * keystore password jetty.truststore - Jetty truststore jetty.trustpassword -
+ * Jetty truststore password
  */
 public class AzkabanWebServer {
-    private static final Logger logger = Logger.getLogger(AzkabanWebServer.class);
-	
-    public static final String AZKABAN_HOME = "AZKABAN_HOME";
-    public static final String DEFAULT_CONF_PATH = "conf";
-    public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
-
-    private static AzkabanWebServer app;
-    
-    private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
-    //private static final int DEFAULT_PORT_NUMBER = 8081;
-    private static final int DEFAULT_SSL_PORT_NUMBER = 8443;
-    private static final int DEFAULT_THREAD_NUMBER = 20;
-    private static final String VELOCITY_DEV_MODE_PARAM = "velocity.dev.mode";
-    private static final String USER_MANAGER_CLASS_PARAM = "user.manager.class";
-    private static final String PROJECT_MANAGER_CLASS_PARAM = "project.manager.class";
-    private static final String DEFAULT_STATIC_DIR = "";
-
-    private final VelocityEngine velocityEngine;
-    private UserManager userManager;
-    private ProjectManager projectManager;    
-    
-    private Props props;
-    private SessionCache sessionCache;
-    private File tempDir;
-
-    /**
-     * Constructor usually called by tomcat AzkabanServletContext to create the
-     * initial server
-     */
-    public AzkabanWebServer() {
-        this(loadConfigurationFromAzkabanHome());
-    }
+	private static final Logger logger = Logger.getLogger(AzkabanWebServer.class);
+
+	public static final String AZKABAN_HOME = "AZKABAN_HOME";
+	public static final String DEFAULT_CONF_PATH = "conf";
+	public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
+
+	private static AzkabanWebServer app;
+
+	private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
+	// private static final int DEFAULT_PORT_NUMBER = 8081;
+	private static final int DEFAULT_SSL_PORT_NUMBER = 8443;
+	private static final int DEFAULT_THREAD_NUMBER = 20;
+	private static final String VELOCITY_DEV_MODE_PARAM = "velocity.dev.mode";
+	private static final String USER_MANAGER_CLASS_PARAM = "user.manager.class";
+	private static final String PROJECT_MANAGER_CLASS_PARAM = "project.manager.class";
+	private static final String DEFAULT_STATIC_DIR = "";
+
+	private final VelocityEngine velocityEngine;
+	private UserManager userManager;
+	private ProjectManager projectManager;
+	private ExecutorManager executorManager;
+
+	private Props props;
+	private SessionCache sessionCache;
+	private File tempDir;
+
+	/**
+	 * Constructor usually called by tomcat AzkabanServletContext to create the
+	 * initial server
+	 */
+	public AzkabanWebServer() {
+		this(loadConfigurationFromAzkabanHome());
+	}
+
+	/**
+	 * Constructor
+	 */
+	public AzkabanWebServer(Props props) {
+		this.props = props;
+		velocityEngine = configureVelocityEngine(props.getBoolean( VELOCITY_DEV_MODE_PARAM, false));
+		sessionCache = new SessionCache(props);
+		userManager = loadUserManager(props);
+		projectManager = loadProjectManager(props);
+		executorManager = loadExecutorManager(props);
+
+		tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
+
+		// Setup time zone
+		if (props.containsKey(DEFAULT_TIMEZONE_ID)) {
+			String timezone = props.getString(DEFAULT_TIMEZONE_ID);
+			TimeZone.setDefault(TimeZone.getTimeZone(timezone));
+			DateTimeZone.setDefault(DateTimeZone.forID(timezone));
 
-    /**
-     * Constructor
-     */
-    public AzkabanWebServer(Props props) {
-        this.props = props;
-        velocityEngine = configureVelocityEngine(props.getBoolean(
-                VELOCITY_DEV_MODE_PARAM, false));
-        sessionCache = new SessionCache(props);
-        userManager = loadUserManager(props);
-        projectManager = loadProjectManager(props);
-        
-        tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
-
-        // Setup time zone
-        if (props.containsKey(DEFAULT_TIMEZONE_ID)) {
-            String timezone = props.getString(DEFAULT_TIMEZONE_ID);
-            TimeZone.setDefault(TimeZone.getTimeZone(timezone));
-            DateTimeZone.setDefault(DateTimeZone.forID(timezone));
-            
 			logger.info("Setting timezone to " + timezone);
-        }
-
-    }
-    
-    private UserManager loadUserManager(Props props) {
-        Class<?> userManagerClass = props.getClass(USER_MANAGER_CLASS_PARAM,
-                null);
-        logger.info("Loading user manager class " + userManagerClass.getName());
-        UserManager manager = null;
-
-        if (userManagerClass != null
-            && userManagerClass.getConstructors().length > 0) {
-
-        	try {
-        		Constructor<?> userManagerConstructor = userManagerClass.getConstructor(Props.class);
-        		manager = (UserManager)userManagerConstructor.newInstance(props);
-        	}
-        	catch (Exception e) {
-        	      logger.error("Could not instantiate UserManager "
-                          + userManagerClass.getName());
-                  throw new RuntimeException(e);
-        	}
-
-        } else {
-            manager = new XmlUserManager(props);
-        }
-
-        return manager;
-    }
-
-    private ProjectManager loadProjectManager(Props props) {
-        Class<?> projectManagerClass = props.getClass(PROJECT_MANAGER_CLASS_PARAM, null);
-        logger.info("Loading project manager class " + projectManagerClass.getName());
-        ProjectManager manager = null;
-
-        if (projectManagerClass != null
-            && projectManagerClass.getConstructors().length > 0) {
-
-        	try {
-        		Constructor<?> projectManagerConstructor = projectManagerClass.getConstructor(Props.class);
-        		manager = (ProjectManager)projectManagerConstructor.newInstance(props);
-        	}
-        	catch (Exception e) {
-        	      logger.error("Could not instantiate ProjectManager "
-                          + projectManagerClass.getName());
-                  throw new RuntimeException(e);
-        	}
-
-        } else {
-            manager = new FileProjectManager(props);
-        }
-
-        return manager;
-    }
-    
-    /**
-     * Returns the web session cache.
-     * 
-     * @return
-     */
-    public SessionCache getSessionCache() {
-        return sessionCache;
-    }
-
-    /**
-     * Returns the velocity engine for pages to use.
-     * 
-     * @return
-     */
-    public VelocityEngine getVelocityEngine() {
-        return velocityEngine;
-    }
-
-    /**
-     * 
-     * @return
-     */
-    public UserManager getUserManager() {
-        return userManager;
-    }
-    
-    /**
-     * 
-     * @return
-     */
-    public ProjectManager getProjectManager() {
-        return projectManager;
-    }
-    
-    /**
-     * Creates and configures the velocity engine.
-     * 
-     * @param devMode
-     * @return
-     */
-    private VelocityEngine configureVelocityEngine(final boolean devMode) {
-        VelocityEngine engine = new VelocityEngine();
-        engine.setProperty("resource.loader", "classpath");
-        engine.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName());
-        engine.setProperty("classpath.resource.loader.cache", !devMode);
-        engine.setProperty("classpath.resource.loader.modificationCheckInterval", 5L);
-        engine.setProperty("resource.manager.logwhenfound", false);
-        engine.setProperty("input.encoding", "UTF-8");
-        engine.setProperty("output.encoding", "UTF-8");
-        engine.setProperty("directive.set.null.allowed", true);
-        engine.setProperty("resource.manager.logwhenfound", false);
-        engine.setProperty("velocimacro.permissions.allow.inline", true);
-        engine.setProperty("velocimacro.library.autoreload", devMode);
-        engine.setProperty("velocimacro.library", "/azkaban/webapp/servlet/velocity/macros.vm");
-        engine.setProperty("velocimacro.permissions.allow.inline.to.replace.global", true);
-        engine.setProperty("velocimacro.arguments.strict", true);
-        engine.setProperty("runtime.log.invalid.references", devMode);
-        engine.setProperty("runtime.log.logsystem.class", Log4JLogChute.class);
-        engine.setProperty("runtime.log.logsystem.log4j.logger", Logger.getLogger("org.apache.velocity.Logger"));
-        engine.setProperty("parser.pool.size", 3);
-        return engine;
-    }
-
-    /**
-     * Returns the global azkaban properties
-     * 
-     * @return
-     */
-    public Props getAzkabanProps() {
-        return props;
-    }
-
-    /**
-     * Azkaban using Jetty
-     * 
-     * @param args
-     */
-    public static void main(String[] args) {
-        OptionParser parser = new OptionParser();
-
-        OptionSpec<String> configDirectory = parser
-                .acceptsAll(Arrays.asList("c", "conf"),"The conf directory for Azkaban.")
-                .withRequiredArg()
-                .describedAs("conf")
-                .ofType(String.class);
-
-        logger.error("Starting Jetty Azkaban...");
-
-        // Grabbing the azkaban settings from the conf directory.
-        Props azkabanSettings = null;
-        OptionSet options = parser.parse(args);
-        if (options.has(configDirectory)) {
-            String path = options.valueOf(configDirectory);
-            logger.info("Loading azkaban settings file from " + path);
-            File file = new File(path, AZKABAN_PROPERTIES_FILE);
-            if (!file.exists() || file.isDirectory() || !file.canRead()) {
-                logger.error("Cannot read file " + file);
-            }
-
-            azkabanSettings = loadAzkabanConfiguration(file.getPath());
-        } else {
-            logger.info("Conf parameter not set, attempting to get value from AZKABAN_HOME env.");
-            azkabanSettings = loadConfigurationFromAzkabanHome();
-        }
-
-        if (azkabanSettings == null) {
-        	// one last chance to 
-        }
-        
-        if (azkabanSettings == null) {
-            logger.error("Azkaban Properties not loaded.");
-            logger.error("Exiting Azkaban...");
-            return;
-        }
-        app = new AzkabanWebServer(azkabanSettings);
-
-        //int portNumber = azkabanSettings.getInt("jetty.port",DEFAULT_PORT_NUMBER);
-        int sslPortNumber = azkabanSettings.getInt("jetty.ssl.port",DEFAULT_SSL_PORT_NUMBER);
-        int maxThreads = azkabanSettings.getInt("jetty.maxThreads",DEFAULT_THREAD_NUMBER);
-
-        logger.info("Setting up Jetty Server with port:" + sslPortNumber
-                + " and numThreads:" + maxThreads);
-
-        final Server server = new Server();
-        SslSocketConnector secureConnector = new SslSocketConnector();
-        secureConnector.setPort(sslPortNumber);
-        secureConnector.setKeystore(azkabanSettings.getString("jetty.keystore"));
-        secureConnector.setPassword(azkabanSettings.getString("jetty.password"));
-        secureConnector.setKeyPassword(azkabanSettings.getString("jetty.keypassword"));
-        secureConnector.setTruststore(azkabanSettings.getString("jetty.truststore"));
-        secureConnector.setTrustPassword(azkabanSettings.getString("jetty.trustpassword"));
-        server.addConnector(secureConnector);
-        
-        QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
-        server.setThreadPool(httpThreadPool);
-
-        String staticDir = azkabanSettings.getString("web.resource.dir",DEFAULT_STATIC_DIR);
-        logger.info("Setting up web resource dir " + staticDir);
-        Context root = new Context(server, "/", Context.SESSIONS);
-
-        root.setResourceBase(staticDir);
-        ServletHolder index = new ServletHolder(new IndexServlet());
-        root.addServlet(index, "/index");
-        root.addServlet(index, "/");
-        
-        ServletHolder staticServlet = new ServletHolder(new DefaultServlet());
-        root.addServlet(staticServlet, "/css/*");
-        root.addServlet(staticServlet, "/js/*");
-        root.addServlet(staticServlet, "/images/*");
-        root.addServlet(staticServlet, "/favicon.ico");
-        
-        root.addServlet(new ServletHolder(new ProjectManagerServlet()), "/manager");
-        root.setAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY, app);
-
-        try {
-            server.start();
-        } catch (Exception e) {
-            logger.warn(e);
-            Utils.croak(e.getMessage(), 1);
-        }
-
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-
-            public void run() {
-                logger.info("Shutting down http server...");
-                try {
-                    server.stop();
-                    server.destroy();
-                } catch (Exception e) {
-                    logger.error("Error while shutting down http server.", e);
-                }
-                logger.info("kk thx bye.");
-            }
-        });
-        logger.info("Server running on port " + sslPortNumber + ".");
-    }
-
-    /**
-     * Loads the Azkaban property file from the AZKABAN_HOME conf directory
-     * 
-     * @return
-     */
-    private static Props loadConfigurationFromAzkabanHome() {
-        String azkabanHome = System.getenv("AZKABAN_HOME");
-
-        if (azkabanHome == null) {
-            logger.error("AZKABAN_HOME not set. Will try default.");
-            return null;
-        }
-
-        if (!new File(azkabanHome).isDirectory()
-                || !new File(azkabanHome).canRead()) {
-            logger.error(azkabanHome + " is not a readable directory.");
-            return null;
-        }
-
-        File confPath = new File(azkabanHome, DEFAULT_CONF_PATH);
-        if (!confPath.exists() || !confPath.isDirectory()
-                || !confPath.canRead()) {
-            logger.error(azkabanHome
-                    + " does not contain a readable conf directory.");
-            return null;
-        }
-
-        File confFile = new File(confPath, AZKABAN_PROPERTIES_FILE);
-        if (!confFile.exists() || confFile.isDirectory() || !confPath.canRead()) {
-            logger.error(confFile
-                    + " does not contain a readable azkaban.properties file.");
-            return null;
-        }
-
-        return loadAzkabanConfiguration(confFile.getPath());
-    }
-
-    /**
-     * Returns the set temp dir
-     * @return
-     */
-    public File getTempDirectory() {
-        return tempDir;
-    }
-
-    /**
-     * Loads the Azkaban conf file int a Props object
+		}
+
+	}
+
+	private UserManager loadUserManager(Props props) {
+		Class<?> userManagerClass = props.getClass(USER_MANAGER_CLASS_PARAM, null);
+		logger.info("Loading user manager class " + userManagerClass.getName());
+		UserManager manager = null;
+
+		if (userManagerClass != null && userManagerClass.getConstructors().length > 0) {
+
+			try {
+				Constructor<?> userManagerConstructor = userManagerClass.getConstructor(Props.class);
+				manager = (UserManager) userManagerConstructor.newInstance(props);
+			} 
+			catch (Exception e) {
+				logger.error("Could not instantiate UserManager "+ userManagerClass.getName());
+				throw new RuntimeException(e);
+			}
+
+		} 
+		else {
+			manager = new XmlUserManager(props);
+		}
+
+		return manager;
+	}
+
+	private ProjectManager loadProjectManager(Props props) {
+		Class<?> projectManagerClass = props.getClass(PROJECT_MANAGER_CLASS_PARAM, null);
+		logger.info("Loading project manager class " + projectManagerClass.getName());
+		ProjectManager manager = null;
+
+		if (projectManagerClass != null && projectManagerClass.getConstructors().length > 0) {
+
+			try {
+				Constructor<?> projectManagerConstructor = projectManagerClass.getConstructor(Props.class);
+				manager = (ProjectManager) projectManagerConstructor.newInstance(props);
+			} 
+			catch (Exception e) {
+				logger.error("Could not instantiate ProjectManager " + projectManagerClass.getName());
+				throw new RuntimeException(e);
+			}
+
+		}
+		else {
+			manager = new FileProjectManager(props);
+		}
+
+		return manager;
+	}
+
+	private ExecutorManager loadExecutorManager(Props props) {
+		ExecutorManager execManager = new ExecutorManager(props);
+		return execManager;
+	}
+
+	/**
+	 * Returns the web session cache.
+	 * 
+	 * @return
+	 */
+	public SessionCache getSessionCache() {
+		return sessionCache;
+	}
+
+	/**
+	 * Returns the velocity engine for pages to use.
+	 * 
+	 * @return
+	 */
+	public VelocityEngine getVelocityEngine() {
+		return velocityEngine;
+	}
+
+	/**
+	 * 
+	 * @return
+	 */
+	public UserManager getUserManager() {
+		return userManager;
+	}
+
+	/**
+	 * 
+	 * @return
+	 */
+	public ProjectManager getProjectManager() {
+		return projectManager;
+	}
+
+	/**
      * 
-     * @param path
-     * @return
      */
-    private static Props loadAzkabanConfiguration(String path) {
-        try {
-            return new Props(null, path);
-        } catch (FileNotFoundException e) {
-            logger.error("File not found. Could not load azkaban config file "
-                    + path);
-        } catch (IOException e) {
-            logger.error("File found, but error reading. Could not load azkaban config file "
-                    + path);
-        }
-
-        return null;
-    }
+	public ExecutorManager getExecutorManager() {
+		return executorManager;
+	}
+
+	/**
+	 * Creates and configures the velocity engine.
+	 * 
+	 * @param devMode
+	 * @return
+	 */
+	private VelocityEngine configureVelocityEngine(final boolean devMode) {
+		VelocityEngine engine = new VelocityEngine();
+		engine.setProperty("resource.loader", "classpath");
+		engine.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName());
+		engine.setProperty("classpath.resource.loader.cache", !devMode);
+		engine.setProperty("classpath.resource.loader.modificationCheckInterval", 5L);
+		engine.setProperty("resource.manager.logwhenfound", false);
+		engine.setProperty("input.encoding", "UTF-8");
+		engine.setProperty("output.encoding", "UTF-8");
+		engine.setProperty("directive.set.null.allowed", true);
+		engine.setProperty("resource.manager.logwhenfound", false);
+		engine.setProperty("velocimacro.permissions.allow.inline", true);
+		engine.setProperty("velocimacro.library.autoreload", devMode);
+		engine.setProperty("velocimacro.library", "/azkaban/webapp/servlet/velocity/macros.vm");
+		engine.setProperty("velocimacro.permissions.allow.inline.to.replace.global", true);
+		engine.setProperty("velocimacro.arguments.strict", true);
+		engine.setProperty("runtime.log.invalid.references", devMode);
+		engine.setProperty("runtime.log.logsystem.class", Log4JLogChute.class);
+		engine.setProperty("runtime.log.logsystem.log4j.logger", Logger.getLogger("org.apache.velocity.Logger"));
+		engine.setProperty("parser.pool.size", 3);
+		return engine;
+	}
+
+	/**
+	 * Returns the global azkaban properties
+	 * 
+	 * @return
+	 */
+	public Props getAzkabanProps() {
+		return props;
+	}
+
+	/**
+	 * Azkaban using Jetty
+	 * 
+	 * @param args
+	 */
+	public static void main(String[] args) {
+		OptionParser parser = new OptionParser();
+
+		OptionSpec<String> configDirectory = parser
+				.acceptsAll(Arrays.asList("c", "conf"), "The conf directory for Azkaban.")
+				.withRequiredArg()
+				.describedAs("conf").ofType(String.class);
+
+		logger.error("Starting Jetty Azkaban...");
+
+		// Grabbing the azkaban settings from the conf directory.
+		Props azkabanSettings = null;
+		OptionSet options = parser.parse(args);
+		if (options.has(configDirectory)) {
+			String path = options.valueOf(configDirectory);
+			logger.info("Loading azkaban settings file from " + path);
+			File file = new File(path, AZKABAN_PROPERTIES_FILE);
+			if (!file.exists() || file.isDirectory() || !file.canRead()) {
+				logger.error("Cannot read file " + file);
+			}
+
+			azkabanSettings = loadAzkabanConfiguration(file.getPath());
+		} 
+		else {
+			logger.info("Conf parameter not set, attempting to get value from AZKABAN_HOME env.");
+			azkabanSettings = loadConfigurationFromAzkabanHome();
+		}
+
+		if (azkabanSettings == null) {
+			// one last chance to
+		}
+
+		if (azkabanSettings == null) {
+			logger.error("Azkaban Properties not loaded.");
+			logger.error("Exiting Azkaban...");
+			return;
+		}
+		app = new AzkabanWebServer(azkabanSettings);
+
+		// int portNumber =
+		// azkabanSettings.getInt("jetty.port",DEFAULT_PORT_NUMBER);
+		int sslPortNumber = azkabanSettings.getInt("jetty.ssl.port",
+				DEFAULT_SSL_PORT_NUMBER);
+		int maxThreads = azkabanSettings.getInt("jetty.maxThreads",
+				DEFAULT_THREAD_NUMBER);
+
+		logger.info("Setting up Jetty Server with port:" + sslPortNumber + " and numThreads:" + maxThreads);
+
+		final Server server = new Server();
+		SslSocketConnector secureConnector = new SslSocketConnector();
+		secureConnector.setPort(sslPortNumber);
+		secureConnector.setKeystore(azkabanSettings.getString("jetty.keystore"));
+		secureConnector.setPassword(azkabanSettings.getString("jetty.password"));
+		secureConnector.setKeyPassword(azkabanSettings.getString("jetty.keypassword"));
+		secureConnector.setTruststore(azkabanSettings.getString("jetty.truststore"));
+		secureConnector.setTrustPassword(azkabanSettings.getString("jetty.trustpassword"));
+		server.addConnector(secureConnector);
+
+		QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
+		server.setThreadPool(httpThreadPool);
+
+		String staticDir = azkabanSettings.getString("web.resource.dir", DEFAULT_STATIC_DIR);
+		logger.info("Setting up web resource dir " + staticDir);
+		Context root = new Context(server, "/", Context.SESSIONS);
+
+		root.setResourceBase(staticDir);
+		ServletHolder index = new ServletHolder(new IndexServlet());
+		root.addServlet(index, "/index");
+		root.addServlet(index, "/");
+
+		ServletHolder staticServlet = new ServletHolder(new DefaultServlet());
+		root.addServlet(staticServlet, "/css/*");
+		root.addServlet(staticServlet, "/js/*");
+		root.addServlet(staticServlet, "/images/*");
+		root.addServlet(staticServlet, "/favicon.ico");
+
+		root.addServlet(new ServletHolder(new ProjectManagerServlet()),"/manager");
+		root.addServlet(new ServletHolder(new FlowExecutorServlet()),"/executor");
+
+		root.setAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY, app);
+
+		try {
+			server.start();
+		} 
+		catch (Exception e) {
+			logger.warn(e);
+			Utils.croak(e.getMessage(), 1);
+		}
+
+		Runtime.getRuntime().addShutdownHook(new Thread() {
+
+			public void run() {
+				logger.info("Shutting down http server...");
+				try {
+					server.stop();
+					server.destroy();
+				} 
+				catch (Exception e) {
+					logger.error("Error while shutting down http server.", e);
+				}
+				logger.info("kk thx bye.");
+			}
+		});
+		logger.info("Server running on port " + sslPortNumber + ".");
+	}
+
+	/**
+	 * Loads the Azkaban property file from the AZKABAN_HOME conf directory
+	 * 
+	 * @return
+	 */
+	private static Props loadConfigurationFromAzkabanHome() {
+		String azkabanHome = System.getenv("AZKABAN_HOME");
+
+		if (azkabanHome == null) {
+			logger.error("AZKABAN_HOME not set. Will try default.");
+			return null;
+		}
+
+		if (!new File(azkabanHome).isDirectory() || !new File(azkabanHome).canRead()) {
+			logger.error(azkabanHome + " is not a readable directory.");
+			return null;
+		}
+
+		File confPath = new File(azkabanHome, DEFAULT_CONF_PATH);
+		if (!confPath.exists() || !confPath.isDirectory()
+				|| !confPath.canRead()) {
+			logger.error(azkabanHome + " does not contain a readable conf directory.");
+			return null;
+		}
+
+		File confFile = new File(confPath, AZKABAN_PROPERTIES_FILE);
+		if (!confFile.exists() || confFile.isDirectory() || !confPath.canRead()) {
+			logger.error(confFile + " does not contain a readable azkaban.properties file.");
+			return null;
+		}
+
+		return loadAzkabanConfiguration(confFile.getPath());
+	}
+
+	/**
+	 * Returns the set temp dir
+	 * 
+	 * @return
+	 */
+	public File getTempDirectory() {
+		return tempDir;
+	}
+
+	/**
+	 * Loads the Azkaban conf file int a Props object
+	 * 
+	 * @param path
+	 * @return
+	 */
+	private static Props loadAzkabanConfiguration(String path) {
+		try {
+			return new Props(null, path);
+		} 
+		catch (FileNotFoundException e) {
+			logger.error("File not found. Could not load azkaban config file " + path);
+		} 
+		catch (IOException e) {
+			logger.error("File found, but error reading. Could not load azkaban config file " + path);
+		}
+
+		return null;
+	}
 }
diff --git a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
index 139b1e8..4d100a5 100644
--- a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
@@ -18,6 +18,7 @@ package azkaban.webapp.servlet;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -43,8 +44,7 @@ import azkaban.webapp.session.Session;
  * Base Servlet for pages
  */
 public abstract class AbstractAzkabanServlet extends HttpServlet {
-	private static final DateTimeFormatter ZONE_FORMATTER = DateTimeFormat
-			.forPattern("z");
+	private static final DateTimeFormatter ZONE_FORMATTER = DateTimeFormat.forPattern("z");
 	private static final String AZKABAN_SUCCESS_MESSAGE = "azkaban.success.message";
 	private static final String AZKABAN_FAILURE_MESSAGE = "azkaban.failure.message";
 
@@ -71,10 +71,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 
 	@Override
 	public void init(ServletConfig config) throws ServletException {
-		application = (AzkabanWebServer) config
-				.getServletContext()
-				.getAttribute(
-						AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY);
+		application = (AzkabanWebServer) config.getServletContext().getAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY);
 
 		if (application == null) {
 			throw new IllegalStateException(
@@ -107,14 +104,14 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 	 * @return
 	 * @throws ServletException
 	 */
-	public String getParam(HttpServletRequest request, String name)
-			throws ServletException {
+	public String getParam(HttpServletRequest request, String name) throws ServletException {
 		String p = request.getParameter(name);
-		if (p == null)
-			throw new ServletException("Missing required parameter '" + name
-					+ "'.");
-		else
+		if (p == null) {
+			throw new ServletException("Missing required parameter '" + name + "'.");
+		}
+		else {
 			return p;
+		}
 	}
 
 	/**
@@ -126,12 +123,26 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 	 * @return
 	 * @throws ServletException
 	 */
-	public int getIntParam(HttpServletRequest request, String name)
-			throws ServletException {
+	public int getIntParam(HttpServletRequest request, String name) throws ServletException {
 		String p = getParam(request, name);
 		return Integer.parseInt(p);
 	}
 
+	public Map<String, String> getParamGroup(HttpServletRequest request, String groupName)  throws ServletException {
+		Enumeration<Object> enumerate = (Enumeration<Object>)request.getParameterNames();
+		String matchString = groupName + "[";
+
+		HashMap<String, String> groupParam = new HashMap<String, String>();
+		while( enumerate.hasMoreElements() ) {
+			String str = (String)enumerate.nextElement();
+			if (str.startsWith(matchString)) {
+				groupParam.put(str.substring(matchString.length(), str.length() - 1), request.getParameter(str));
+			}
+			
+		}
+		return groupParam;
+	}
+	
 	/**
 	 * Returns the session value of the request.
 	 * 
@@ -139,8 +150,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 	 * @param key
 	 * @param value
 	 */
-	protected void setSessionValue(HttpServletRequest request, String key,
-			Object value) {
+	protected void setSessionValue(HttpServletRequest request, String key, Object value) {
 		request.getSession(true).setAttribute(key, value);
 	}
 
@@ -152,8 +162,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 	 * @param value
 	 */
 	@SuppressWarnings({ "unchecked", "rawtypes" })
-	protected void addSessionValue(HttpServletRequest request, String key,
-			Object value) {
+	protected void addSessionValue(HttpServletRequest request, String key, Object value) {
 		List l = (List) request.getSession(true).getAttribute(key);
 		if (l == null)
 			l = new ArrayList();
@@ -168,8 +177,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 	 * @param response
 	 * @param errorMsg
 	 */
-	protected void setErrorMessageInCookie(HttpServletResponse response,
-			String errorMsg) {
+	protected void setErrorMessageInCookie(HttpServletResponse response, String errorMsg) {
 		Cookie cookie = new Cookie(AZKABAN_FAILURE_MESSAGE, errorMsg);
 		response.addCookie(cookie);
 	}
@@ -181,8 +189,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 	 * @param response
 	 * @param errorMsg
 	 */
-	protected void setSuccessMessageInCookie(HttpServletResponse response,
-			String message) {
+	protected void setSuccessMessageInCookie(HttpServletResponse response, String message) {
 		Cookie cookie = new Cookie(AZKABAN_SUCCESS_MESSAGE, message);
 		response.addCookie(cookie);
 	}
@@ -245,10 +252,8 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 	 * @param template
 	 * @return
 	 */
-	protected Page newPage(HttpServletRequest req, HttpServletResponse resp,
-			Session session, String template) {
-		Page page = new Page(req, resp, application.getVelocityEngine(),
-				template);
+	protected Page newPage(HttpServletRequest req, HttpServletResponse resp, Session session, String template) {
+		Page page = new Page(req, resp, application.getVelocityEngine(), template);
 		page.add("azkaban_name", name);
 		page.add("azkaban_label", label);
 		page.add("azkaban_color", color);
@@ -260,14 +265,11 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 		page.add("context", req.getContextPath());
 
 		String errorMsg = getErrorMessageFromCookie(req);
-		page.add("error_message",
-				errorMsg == null || errorMsg.isEmpty() ? "null" : errorMsg);
+		page.add("error_message", errorMsg == null || errorMsg.isEmpty() ? "null" : errorMsg);
 		setErrorMessageInCookie(resp, null);
 
 		String successMsg = getSuccessMessageFromCookie(req);
-		page.add("success_message",
-				successMsg == null || successMsg.isEmpty() ? "null"
-						: successMsg);
+		page.add("success_message", successMsg == null || successMsg.isEmpty() ? "null" : successMsg);
 		setSuccessMessageInCookie(resp, null);
 
 		return page;
@@ -281,10 +283,8 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 	 * @param template
 	 * @return
 	 */
-	protected Page newPage(HttpServletRequest req, HttpServletResponse resp,
-			String template) {
-		Page page = new Page(req, resp, application.getVelocityEngine(),
-				template);
+	protected Page newPage(HttpServletRequest req, HttpServletResponse resp, String template) {
+		Page page = new Page(req, resp, application.getVelocityEngine(), template);
 		page.add("azkaban_name", name);
 		page.add("azkaban_label", label);
 		page.add("azkaban_color", color);
@@ -301,8 +301,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 	 * @param obj
 	 * @throws IOException
 	 */
-	protected void writeJSON(HttpServletResponse resp, Object obj)
-			throws IOException {
+	protected void writeJSON(HttpServletResponse resp, Object obj) throws IOException {
 		resp.setContentType(JSON_MIME_TYPE);
 		ObjectMapper mapper = new ObjectMapper();
 		mapper.writeValue(resp.getOutputStream(), obj);
@@ -315,21 +314,17 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 	 * @return
 	 */
 	public static AzkabanWebServer getApp(ServletConfig config) {
-		AzkabanWebServer app = (AzkabanWebServer) config
-				.getServletContext()
-				.getAttribute(
-						AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY);
+		AzkabanWebServer app = (AzkabanWebServer) config.getServletContext().getAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY);
 
 		if (app == null) {
-			throw new IllegalStateException(
-					"No batch application is defined in the servlet context!");
-		} else {
+			throw new IllegalStateException("No batch application is defined in the servlet context!");
+		} 
+		else {
 			return app;
 		}
 	}
 
-	public static String createJsonResponse(String status, String message,
-			String action, Map<String, Object> params) {
+	public static String createJsonResponse(String status, String message, String action, Map<String, Object> params) {
 		HashMap<String, Object> response = new HashMap<String, Object>();
 		response.put("status", status);
 		if (message != null) {
diff --git a/src/java/azkaban/webapp/servlet/AzkabanServletContextListener.java b/src/java/azkaban/webapp/servlet/AzkabanServletContextListener.java
index f4a755b..7c5745a 100644
--- a/src/java/azkaban/webapp/servlet/AzkabanServletContextListener.java
+++ b/src/java/azkaban/webapp/servlet/AzkabanServletContextListener.java
@@ -42,7 +42,6 @@ public class AzkabanServletContextListener implements ServletContextListener {
 	public void contextInitialized(ServletContextEvent event) {
 		this.app = new AzkabanWebServer();
 
-		event.getServletContext().setAttribute(AZKABAN_SERVLET_CONTEXT_KEY,
-				this.app);
+		event.getServletContext().setAttribute(AZKABAN_SERVLET_CONTEXT_KEY, this.app);
 	}
 }
diff --git a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
index c591351..2c643e0 100644
--- a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
@@ -1,24 +1,112 @@
 package azkaban.webapp.servlet;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
+import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutableFlow.Status;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
+import azkaban.project.ProjectManagerException;
+import azkaban.user.User;
+import azkaban.user.Permission.Type;
+import azkaban.utils.Props;
 import azkaban.webapp.session.Session;
 
 public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
+	private static final long serialVersionUID = 1L;
+	private ProjectManager projectManager;
+	private ExecutorManager executorManager;
+	private File tempDir;
+
+	@Override
+	public void init(ServletConfig config) throws ServletException {
+		super.init(config);
+		projectManager = this.getApplication().getProjectManager();
+		executorManager = this.getApplication().getExecutorManager();
+		tempDir = this.getApplication().getTempDirectory();
+	}
 
 	@Override
 	protected void handleGet(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
-		
+		if (hasParam(req, "ajax")) {
+			handleAJAXAction(req, resp, session);
+		}
 	}
 
 	@Override
 	protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+		if (hasParam(req, "ajax")) {
+			handleAJAXAction(req, resp, session);
+		}
+	}
+
+	private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+		String projectName = getParam(req, "project");
 
+		HashMap<String, Object> ret = new HashMap<String, Object>();
+		ret.put("project", projectName);
 		
+		String ajaxName = getParam(req, "ajax");
+		if (ajaxName.equals("executeFlow")) {
+			ajaxExecuteFlow(req, resp, ret, session.getUser());
+		}
+		
+		this.writeJSON(resp, ret);
 	}
+	
+	private void ajaxExecuteFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user) throws ServletException {
+		String projectId = getParam(req, "project");
+		String flowId = getParam(req, "flow");
+		
+		ret.put("flow", flowId);
+		
+		Project project = projectManager.getProject(projectId, user);
+		if (project == null) {
+			ret.put("error", "Project " + projectId + " does not exist");
+			return;
+		}
+		
+		if (!project.hasPermission(user, Type.EXECUTE)) {
+			ret.put("error", "Permission denied. Cannot execute " + flowId);
+			return;
+		}
 
+		Flow flow = project.getFlow(flowId);
+		if (flow == null) {
+			ret.put("error", "Flow " + flowId + " cannot be found in project " + project);
+			return;
+		}
+		
+		HashMap<String, Props> sources;
+		try {
+			sources = projectManager.getAllFlowProperties(project, flowId, user);
+		}
+		catch (ProjectManagerException e) {
+			ret.put("error", e.getMessage());
+			return;
+		}
+		
+		ExecutableFlow exflow = ExecutableFlow.createExecutableFlow(flow, sources);
+		
+		Map<String, String> paramGroup = this.getParamGroup(req, "disabled");
+		for (Map.Entry<String, String> entry: paramGroup.entrySet()) {
+			boolean nodeDisabled = Boolean.parseBoolean(entry.getValue());
+			exflow.setStatus(entry.getKey(), nodeDisabled ? Status.IGNORED : Status.READY);
+		}
+		
+		executorManager.executeFlow(exflow);
+		String execId = exflow.getExecutionId();
+		
+		ret.put("execid", "test");
+	}
 }
diff --git a/src/java/azkaban/webapp/servlet/IndexServlet.java b/src/java/azkaban/webapp/servlet/IndexServlet.java
index 91ab4ba..69348d7 100644
--- a/src/java/azkaban/webapp/servlet/IndexServlet.java
+++ b/src/java/azkaban/webapp/servlet/IndexServlet.java
@@ -40,21 +40,18 @@ public class IndexServlet extends LoginAbstractAzkabanServlet {
 	private static final long serialVersionUID = -1;
 
 	@Override
-	protected void handleGet(HttpServletRequest req, HttpServletResponse resp,
-			Session session) throws ServletException, IOException {
+	protected void handleGet(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
 		User user = session.getUser();
 
 		ProjectManager manager = this.getApplication().getProjectManager();
 		List<Project> projects = manager.getProjects(user);
-		Page page = newPage(req, resp, session,
-				"azkaban/webapp/servlet/velocity/index.vm");
+		Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/index.vm");
 		page.add("projects", projects);
 		page.render();
 	}
 
 	@Override
-	protected void handlePost(HttpServletRequest req, HttpServletResponse resp,
-			Session session) throws ServletException, IOException {
+	protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
 		if (hasParam(req, "action")) {
 			String action = getParam(req, "action");
 			if (action.equals("create")) {
diff --git a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index c7da369..4492bca 100644
--- a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -38,8 +38,7 @@ public abstract class LoginAbstractAzkabanServlet extends
 		if (hasParam(req, "logout")) {
 			resp.sendRedirect(req.getContextPath());
 			if (session != null) {
-				getApplication().getSessionCache().removeSession(
-						session.getSessionId());
+				getApplication().getSessionCache().removeSession(session.getSessionId());
 			}
 			return;
 		}
@@ -77,13 +76,11 @@ public abstract class LoginAbstractAzkabanServlet extends
 		}
 	}
 
-	private void handleLogin(HttpServletRequest req, HttpServletResponse resp)
-			throws ServletException, IOException {
+	private void handleLogin(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
 		handleLogin(req, resp, null);
 	}
 
-	private void handleLogin(HttpServletRequest req, HttpServletResponse resp,
-			String errorMsg) throws ServletException, IOException {
+	private void handleLogin(HttpServletRequest req, HttpServletResponse resp, String errorMsg) throws ServletException, IOException {
 
 		Page page = newPage(req, resp,
 				"azkaban/webapp/servlet/velocity/login.vm");
@@ -95,38 +92,40 @@ public abstract class LoginAbstractAzkabanServlet extends
 	}
 
 	@Override
-	protected void doPost(HttpServletRequest req, HttpServletResponse resp)
-			throws ServletException, IOException {
+	protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
 		if (hasParam(req, "action")) {
 			String action = getParam(req, "action");
 			if (action.equals("login")) {
 				handleLoginAction(req, resp);
-			} else {
+			} 
+			else {
 				Session session = getSessionFromRequest(req);
 				if (session == null) {
 					if (isAjaxCall(req)) {
-						String response = createJsonResponse("error",
-								"Invalid Session. Need to re-login", "login",
-								null);
+						String response = createJsonResponse("error", "Invalid Session. Need to re-login", "login", null);
 						writeResponse(resp, response);
-					} else {
+					} 
+					else {
 						handleLogin(req, resp, "Enter username and password");
 					}
-				} else {
+				} 
+				else {
 					handlePost(req, resp, session);
 				}
 			}
-		} else {
+		} 
+		else {
 			Session session = getSessionFromRequest(req);
 			if (session == null) {
 				if (isAjaxCall(req)) {
-					String response = createJsonResponse("error",
-							"Invalid Session. Need to re-login", "login", null);
+					String response = createJsonResponse("error", "Invalid Session. Need to re-login", "login", null);
 					writeResponse(resp, response);
-				} else {
+				} 
+				else {
 					handleLogin(req, resp, "Enter username and password");
 				}
-			} else {
+			} 
+			else {
 				handlePost(req, resp, session);
 			}
 		}
@@ -142,7 +141,8 @@ public abstract class LoginAbstractAzkabanServlet extends
 			User user = null;
 			try {
 				user = manager.getUser(username, password);
-			} catch (UserManagerException e) {
+			} 
+			catch (UserManagerException e) {
 				handleLogin(req, resp, e.getMessage());
 				return;
 			}
@@ -152,26 +152,25 @@ public abstract class LoginAbstractAzkabanServlet extends
 			resp.addCookie(new Cookie(SESSION_ID_NAME, randomUID));
 			getApplication().getSessionCache().addSession(session);
 			handleGet(req, resp, session);
-		} else {
+		} 
+		else {
 			if (isAjaxCall(req)) {
-				String response = createJsonResponse("error",
-						"Incorrect Login.", "login", null);
+				String response = createJsonResponse("error", "Incorrect Login.", "login", null);
 				writeResponse(resp, response);
-			} else {
+			} 
+			else {
 				handleLogin(req, resp, "Enter username and password");
 			}
 		}
 	}
 	
-	protected void writeResponse(HttpServletResponse resp, String response)
-			throws IOException {
+	protected void writeResponse(HttpServletResponse resp, String response) throws IOException {
 		Writer writer = resp.getWriter();
 		writer.append(response);
 		writer.flush();
 	}
 
-	protected boolean isAjaxCall(HttpServletRequest req)
-			throws ServletException {
+	protected boolean isAjaxCall(HttpServletRequest req) throws ServletException {
 		String value = req.getHeader("X-Requested-With");
 		if (value != null) {
 			logger.info("has X-Requested-With " + value);
@@ -191,9 +190,7 @@ public abstract class LoginAbstractAzkabanServlet extends
 	 * @throws ServletException
 	 * @throws IOException
 	 */
-	protected abstract void handleGet(HttpServletRequest req,
-			HttpServletResponse resp, Session session) throws ServletException,
-			IOException;
+	protected abstract void handleGet(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException;
 
 	/**
 	 * The post request is handed off to the implementor after the user is
@@ -205,7 +202,5 @@ public abstract class LoginAbstractAzkabanServlet extends
 	 * @throws ServletException
 	 * @throws IOException
 	 */
-	protected abstract void handlePost(HttpServletRequest req,
-			HttpServletResponse resp, Session session) throws ServletException,
-			IOException;
+	protected abstract void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException;
 }
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 8baf2cd..c2cc965 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -73,8 +73,8 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 	@Override
 	protected void handleGet(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
 		if ( hasParam(req, "project") ) {
-			if (hasParam(req, "json")) {
-				handleJSONAction(req, resp, session);
+			if (hasParam(req, "ajax")) {
+				handleAJAXAction(req, resp, session);
 			}
 			else if (hasParam(req, "permissions")) {
 				handlePermissionPage(req, resp, session);
@@ -116,7 +116,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 		}
 	}
 	
-	private void handleJSONAction(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+	private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
 		String projectName = getParam(req, "project");
 		User user = session.getUser();
 		
@@ -132,47 +132,47 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 			return;
 		}
 
-		String jsonName = getParam(req, "json");
-		if (jsonName.equals("fetchflowjobs")) {
-			if (handleJsonPermission(project, user, Type.READ, ret)) {
-				jsonFetchFlow(project, ret, req, resp);
+		String ajaxName = getParam(req, "ajax");
+		if (ajaxName.equals("fetchflowjobs")) {
+			if (handleAjaxPermission(project, user, Type.READ, ret)) {
+				ajaxFetchFlow(project, ret, req, resp);
 			}
 		}
-		else if (jsonName.equals("fetchflowgraph")) {
-			if (handleJsonPermission(project, user, Type.READ, ret)) {
-				jsonFetchFlowGraph(project, ret, req, resp);
+		else if (ajaxName.equals("fetchflowgraph")) {
+			if (handleAjaxPermission(project, user, Type.READ, ret)) {
+				ajaxFetchFlowGraph(project, ret, req, resp);
 			}
 		}
-		else if (jsonName.equals("fetchprojectflows")) {
-			if (handleJsonPermission(project, user, Type.READ, ret)) {
-				jsonFetchProjectFlows(project, ret, req, resp);
+		else if (ajaxName.equals("fetchprojectflows")) {
+			if (handleAjaxPermission(project, user, Type.READ, ret)) {
+				ajaxFetchProjectFlows(project, ret, req, resp);
 			}
 		}
-		else if (jsonName.equals("changeDescription")) {
-			if (handleJsonPermission(project, user, Type.WRITE, ret)) {
-				jsonChangeDescription(project, ret, req, resp);
+		else if (ajaxName.equals("changeDescription")) {
+			if (handleAjaxPermission(project, user, Type.WRITE, ret)) {
+				ajaxChangeDescription(project, ret, req, resp);
 			}
 		}
-		else if (jsonName.equals("getPermissions")) {
-			if (handleJsonPermission(project, user, Type.READ, ret)) {
-				jsonGetPermissions(project, ret);
+		else if (ajaxName.equals("getPermissions")) {
+			if (handleAjaxPermission(project, user, Type.READ, ret)) {
+				ajaxGetPermissions(project, ret);
 			}
 		}
-		else if (jsonName.equals("changeUserPermission")) {
-			if (handleJsonPermission(project, user, Type.ADMIN, ret)) {
-				jsonChangePermissions(project, ret, req);
+		else if (ajaxName.equals("changeUserPermission")) {
+			if (handleAjaxPermission(project, user, Type.ADMIN, ret)) {
+				ajaxChangePermissions(project, ret, req);
 			}
 		}
-		else if (jsonName.equals("addUserPermission")) {
-			if (handleJsonPermission(project, user, Type.ADMIN, ret)) {
-				jsonAddUserPermission(project, ret, req);
+		else if (ajaxName.equals("addUserPermission")) {
+			if (handleAjaxPermission(project, user, Type.ADMIN, ret)) {
+				ajaxAddUserPermission(project, ret, req);
 			}
 		}
 		
 		this.writeJSON(resp, ret);
 	}
 	
-	private boolean handleJsonPermission(Project project, User user, Type type, Map<String, Object> ret) {
+	private boolean handleAjaxPermission(Project project, User user, Type type, Map<String, Object> ret) {
 		if (project.hasPermission(user, type)) {
 			return true;
 		}
@@ -181,7 +181,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 		return false;
 	}
 	
-	private void jsonChangeDescription(Project project, HashMap<String, Object> ret, HttpServletRequest req, HttpServletResponse resp) throws ServletException {
+	private void ajaxChangeDescription(Project project, HashMap<String, Object> ret, HttpServletRequest req, HttpServletResponse resp) throws ServletException {
 		String description = getParam(req, "description");
 		project.setDescription(description);
 		
@@ -192,7 +192,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 		}
 	}
 	
-	private void jsonFetchProjectFlows(Project project, HashMap<String, Object> ret, HttpServletRequest req, HttpServletResponse resp) throws ServletException {
+	private void ajaxFetchProjectFlows(Project project, HashMap<String, Object> ret, HttpServletRequest req, HttpServletResponse resp) throws ServletException {
 		ArrayList<Map<String,Object>> flowList = new ArrayList<Map<String,Object>>();
 		for (Flow flow: project.getFlows()) {
 			HashMap<String, Object> flowObj = new HashMap<String, Object>();
@@ -203,7 +203,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 		ret.put("flows", flowList); 
 	}
 	
-	private void jsonFetchFlowGraph(Project project, HashMap<String, Object> ret, HttpServletRequest req, HttpServletResponse resp) throws ServletException {
+	private void ajaxFetchFlowGraph(Project project, HashMap<String, Object> ret, HttpServletRequest req, HttpServletResponse resp) throws ServletException {
 		String flowId = getParam(req, "flow");
 		Flow flow = project.getFlow(flowId);
 		
@@ -215,9 +215,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 			nodeObj.put("x", node.getPosition().getX());
 			nodeObj.put("y", node.getPosition().getY());
 			nodeObj.put("level", node.getLevel());
-			if (node.getState() != Node.State.WAITING) {
-				nodeObj.put("state", node.getState());
-			}
+
 			nodeList.add(nodeObj);
 		}
 		
@@ -253,7 +251,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 		ret.put("edges", edgeList);
 	}
 	
-	private void jsonFetchFlow(Project project, HashMap<String, Object> ret, HttpServletRequest req, HttpServletResponse resp) throws ServletException {
+	private void ajaxFetchFlow(Project project, HashMap<String, Object> ret, HttpServletRequest req, HttpServletResponse resp) throws ServletException {
 		String flowId = getParam(req, "flow");
 		Flow flow = project.getFlow(flowId);
 
@@ -291,7 +289,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 		ret.put("nodes", nodeList);
 	}
 	
-	private void jsonAddUserPermission(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
+	private void ajaxAddUserPermission(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
 		String username = getParam(req, "username");
 		UserManager userManager = getApplication().getUserManager();
 		if (!userManager.validateUser(username)) {
@@ -329,7 +327,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 	}
 
 	
-	private void jsonChangePermissions(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
+	private void ajaxChangePermissions(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
 		boolean admin = Boolean.parseBoolean(getParam(req, "permissions[admin]"));
 		boolean read = Boolean.parseBoolean(getParam(req, "permissions[read]"));
 		boolean write = Boolean.parseBoolean(getParam(req, "permissions[write]"));
@@ -359,7 +357,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 		}
 	}
 	
-	private void jsonGetPermissions(Project project, HashMap<String, Object> ret) {
+	private void ajaxGetPermissions(Project project, HashMap<String, Object> ret) {
 		ArrayList<HashMap<String, Object>> permissions = new ArrayList<HashMap<String, Object>>();
 		for(Pair<String, Permission> perm: project.getUserPermissions()) {
 			HashMap<String, Object> permObj = new HashMap<String, Object>();
@@ -547,8 +545,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 		page.render();
 	}
 
-	private void handleCreate(HttpServletRequest req, HttpServletResponse resp,
-			Session session) throws ServletException {
+	private void handleCreate(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException {
 
 		String projectName = hasParam(req, "name") ? getParam(req, "name") : null;
 		String projectDescription = hasParam(req, "description") ? getParam(req, "description") : null;
@@ -582,9 +579,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 		}
 	}
 
-	private void handleUpload(HttpServletRequest req, HttpServletResponse resp, Map<String, Object> multipart,
-			Session session) throws ServletException, IOException {
-		
+	private void handleUpload(HttpServletRequest req, HttpServletResponse resp, Map<String, Object> multipart, Session session) throws ServletException, IOException {
 		User user = session.getUser();
 		String projectName = (String) multipart.get("project");
 		FileItem item = (FileItem) multipart.get("file");
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
index 5ba18ad..e880ce6 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -78,6 +78,11 @@
 		<ul id="jobMenu" class="contextMenu">  
 			<li class="open"><a href="#open">Open...</a></li>
 			<li class="openwindow"><a href="#openwindow">Open in New Window...</a></li>
+			<li class="disable"><a href="#disable">Disable</a></li>
+			<li class="disableancestors"><a href="#disableancestors">Disable All Ancestors</a></li>
+			<li class="disabledecendents"><a href="#disabledecendents">Disable Decendents</a></li>
+			<li class="enableancestors"><a href="#enableancestors">Enable All Ancestors</a></li>
+			<li class="disabledecendents"><a href="#disabledecendents">Disable Decendents</a></li>
 		</ul>
 
 		</div>
diff --git a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
index a806c9f..df2c192 100644
--- a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
@@ -46,8 +46,8 @@
 							<h4><a href="${context}/manager?project=${project.name}">Project <span>$project.name</span></a></h4>
 						</div>
 						
-						<a id="execute-btn" class="btn1" href="#">Execute Now</a>
-						<a id="execute-btn" class="btn2" href="#">Execute Custom</a>
+						<div id="executebtn" class="btn1">Execute</div>
+						<div id="schedulebtn" class="btn2">Schedule Flow</div>
 					</div>
 					
 					<div id="headertabs" class="headertabs">
@@ -81,6 +81,8 @@
 		<ul id="jobMenu" class="contextMenu">  
 			<li class="open"><a href="#open">Open...</a></li>
 			<li class="openwindow"><a href="#openwindow">Open in New Window...</a></li>
+			<li class="enable separator"><a href="#enable">Enable</a></li>
+			<li class="disable"><a href="#disable">Disable</a></li>
 		</ul>
 
 		</div>
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index e7c81f7..309536b 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -1112,6 +1112,19 @@ tr:hover td {
 	bottom: 90px;
 }
 
+#nodeOptions {
+	position: absolute;
+	bottom: 10px;
+}
+
+#flowProperties {
+	position: absolute:
+	height: 100px;
+	width: 100%;
+	background-color: #000;
+	top: 100px;
+}
+
 #filter {
 	width: 100%;
 }
@@ -1136,6 +1149,10 @@ tr:hover td {
 	color: #EEE;
 }
 
+#list ul li.nodedisabled {
+	opacity: 0.3;
+}
+
 #list ul li a {
 	font-size: 10pt;
 	margin-left: 5px;
@@ -1195,6 +1212,10 @@ svg .edge:hover {
 	stroke-width: 4;
 }
 
+svg .node.disabled {
+	opacity: 0.3;
+}
+
 svg .node .backboard {
 	fill: #FFF;
 	opacity: 0.05;
@@ -1209,7 +1230,7 @@ svg .node:hover .backboard {
 }
 
 svg .node circle {
-	fill: #FFF;
+	fill: #888;
 	stroke: #777;
 	stroke-width: 2;
 }
diff --git a/src/web/js/azkaban.flow.view.js b/src/web/js/azkaban.flow.view.js
index 6800733..c2eaf7e 100644
--- a/src/web/js/azkaban.flow.view.js
+++ b/src/web/js/azkaban.flow.view.js
@@ -9,12 +9,54 @@ var handleJobMenuClick = function(action, el, pos) {
 	else if(action == "openwindow") {
 		window.open(requestURL);
 	}
+	else if(action == "disable") {
+		var oldDisabled = graphModel.get("disabled");
+		
+		var newDisabled = {};
+		// Copy disabled list
+		if (oldDisabled) {
+			for(var id in oldDisabled) {
+			    if(oldDisabled.hasOwnProperty(id)) {
+			    	var disabled = (oldDisabled[id]);
+			    	if (disabled) {
+			    		newDisabled[id]=true;
+			    	}
+			    }
+			}
+		}
+		
+		newDisabled[jobid] = true;
+		graphModel.set({disabled: newDisabled});
+	}
+	else if(action == "enable") {
+		var oldDisabled = graphModel.get("disabled");
+		// Copy disabled list
+		if (oldDisabled) {
+			var newDisabled = {};
+			for(var id in oldDisabled) {
+			    if(oldDisabled.hasOwnProperty(id)) {
+			    	var disabled = (oldDisabled[id]);
+			    	if (disabled) {
+			    		newDisabled[id]=true;
+			    	}
+			    }
+			}
+			
+			if( oldDisabled[jobid]) {
+				newDisabled[jobid] = false;
+				graphModel.set({disabled: newDisabled});
+			}
+		}
 
+	}
 }
 
 function hasClass(el, name) 
 {
 	var classes = el.getAttribute("class");
+	if (classes == null) {
+		return false;
+	}
    return new RegExp('(\\s|^)'+name+'(\\s|$)').test(classes);
 }
 
@@ -79,6 +121,7 @@ azkaban.JobListView = Backbone.View.extend({
 	},
 	initialize: function(settings) {
 		this.model.bind('change:selected', this.handleSelectionChange, this);
+		this.model.bind('change:disabled', this.handleDisabledChange, this);
 		this.model.bind('change:graph', this.render, this);
 	},
 	filterJobs: function(self) {
@@ -199,6 +242,20 @@ azkaban.JobListView = Backbone.View.extend({
 			this.model.set({"selected": jobid});
 		}
 	},
+	handleDisabledChange: function(evt) {
+		var disabledMap = this.model.get("disabled");
+		for(var id in disabledMap) {
+		    if(disabledMap.hasOwnProperty(id)) {
+		    	var disabled = (disabledMap[id]);
+		    	if (disabled) {
+		    		$(this.listNodes[id]).addClass("nodedisabled");
+		    	}
+		    	else {
+		    		$(this.listNodes[id]).removeClass("nodedisabled");
+		    	}
+		    }
+		}
+	},
 	handleSelectionChange: function(evt) {
 		if (!this.model.hasChanged("selected")) {
 			return;
@@ -227,6 +284,7 @@ azkaban.SvgGraphView = Backbone.View.extend({
 	},
 	initialize: function(settings) {
 		this.model.bind('change:selected', this.changeSelected, this);
+		this.model.bind('change:disabled', this.handleDisabledChange, this);
 		this.model.bind('change:graph', this.render, this);
 		this.model.bind('resetPanZoom', this.resetPanZoom, this);
 		
@@ -367,6 +425,24 @@ azkaban.SvgGraphView = Backbone.View.extend({
 			self.mainG.appendChild(line);
 		}
 	},
+	handleDisabledChange: function(evt) {
+		var disabledMap = this.model.get("disabled");
+		for(var id in disabledMap) {
+		    if(disabledMap.hasOwnProperty(id)) {
+		    	var disabled = disabledMap[id];
+		    	this.nodes[id].disabled = disabled;
+		    	var g = document.getElementById(id);
+		    	
+		    	if (disabled) {
+		    		this.nodes[id].disabled = disabled;
+					addClass(g, "disabled");
+		    	}
+		    	else {
+		    		removeClass(g, "disabled");
+		    	}
+		    }
+		}
+	},
 	drawNode: function(self, node, bounds) {
 		var svg = self.svgGraph;
 		var svgns = self.svgns;
@@ -439,7 +515,6 @@ azkaban.SvgGraphView = Backbone.View.extend({
 		var bounds = this.graphBounds;
 		$("#svgGraph").svgNavigate("transformToBox", {x: bounds.minX, y: bounds.minY, width: (bounds.maxX - bounds.minX), height: (bounds.maxY - bounds.minY) });
 	}
-	
 });
 
 var graphModel;
@@ -447,6 +522,7 @@ azkaban.GraphModel = Backbone.Model.extend({});
 
 $(function() {
 	var selected;
+	
 	if (window.location.hash) {
 		var hash = window.location.hash;
 		if (hash == "#jobslist") {
@@ -470,7 +546,7 @@ $(function() {
 
 	$.get(
 	      requestURL,
-	      {"project": projectName, "json":"fetchflowgraph", "flow":flowName},
+	      {"project": projectName, "ajax":"fetchflowgraph", "flow":flowName},
 	      function(data) {
 	          console.log("data fetched");
 	          graphModel.set({data: data});
@@ -478,4 +554,17 @@ $(function() {
 	      },
 	      "json"
 	    );
+	    
+	$("#executebtn").click( function() {
+		var executeURL = contextURL + "/executor";
+		$.get(
+			executeURL,
+			{"project": projectName, "ajax":"executeFlow", "flow":flowName, "disabled":graphModel.get("disabled")},
+			function(data) {
+				alert("call success");
+			},
+			"json"
+		);
+		
+	});
 });
diff --git a/src/web/js/azkaban.main.view.js b/src/web/js/azkaban.main.view.js
index 49b9d25..10eff92 100644
--- a/src/web/js/azkaban.main.view.js
+++ b/src/web/js/azkaban.main.view.js
@@ -42,7 +42,7 @@ azkaban.ProjectTableView= Backbone.View.extend({
 	    
 	    $.get(
 	      requestURL,
-	      {"project": targetId, "json":"fetchprojectflows"},
+	      {"project": targetId, "ajax":"fetchprojectflows"},
 	      function(data) {
 	        console.log("Success");
 	        target.loaded = true;
diff --git a/src/web/js/azkaban.permission.view.js b/src/web/js/azkaban.permission.view.js
index d86bf1c..d6c0bbb 100644
--- a/src/web/js/azkaban.permission.view.js
+++ b/src/web/js/azkaban.permission.view.js
@@ -138,7 +138,7 @@ azkaban.ChangePermissionView= Backbone.View.extend({
 
   	$.get(
 	      requestURL,
-	      {"project": projectId, "username": userID, "json":command, "permissions": this.permission},
+	      {"project": projectId, "username": userID, "ajax":command, "permissions": this.permission},
 	      function(data) {
 	      	  console.log("Output");
 	      	  if (data.error) {
diff --git a/src/web/js/azkaban.project.view.js b/src/web/js/azkaban.project.view.js
index b929c96..0365e80 100644
--- a/src/web/js/azkaban.project.view.js
+++ b/src/web/js/azkaban.project.view.js
@@ -86,7 +86,7 @@ azkaban.FlowTableView= Backbone.View.extend({
 	    
 	    $.get(
 	      requestURL,
-	      {"project": projectId, "json":"fetchflowjobs", "flow":targetId},
+	      {"project": projectId, "ajax":"fetchflowjobs", "flow":targetId},
 	      function(data) {
 	        console.log("Success");
 	        target.loaded = true;
@@ -172,7 +172,7 @@ azkaban.ProjectSummaryView= Backbone.View.extend({
 
           $.get(
 		      requestURL,
-		      {"project": projectId, "json":"changeDescription", "description":newText},
+		      {"project": projectId, "ajax":"changeDescription", "description":newText},
 		      function(data) {
 				if (data.error) {
 					alert(data.error);