azkaban-memoizeit

Rewriting flow display code. Having separate code for regular

12/12/2013 2:33:42 AM

Changes

unit/build.xml 13(+12 -1)

Details

diff --git a/src/java/azkaban/executor/ConnectorParams.java b/src/java/azkaban/executor/ConnectorParams.java
index 15a8bf8..3fff9c2 100644
--- a/src/java/azkaban/executor/ConnectorParams.java
+++ b/src/java/azkaban/executor/ConnectorParams.java
@@ -62,11 +62,11 @@ public interface ConnectorParams {
 	public static final int NODE_END_INDEX = 3;
 
 	public static final String UPDATE_TIME_LIST_PARAM = "updatetime";
-	public static final String EXEC_ID_LIST_PARAM = "execid";
+	public static final String EXEC_ID_LIST_PARAM = "executionId";
 	
 	public static final String FORCED_FAILED_MARKER = ".failed";
 	
-	public static final String UPDATE_MAP_EXEC_ID = "execId";
+	public static final String UPDATE_MAP_EXEC_ID = "executionId";
 	public static final String UPDATE_MAP_JOBID = "jobId";
 	public static final String UPDATE_MAP_UPDATE_TIME = "updateTime";
 	public static final String UPDATE_MAP_STATUS = "status";
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 9ee6aa2..81ae8aa 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -19,13 +19,13 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import azkaban.flow.Flow;
-import azkaban.flow.FlowProps;
 import azkaban.project.Project;
-import azkaban.utils.JSONUtils;
+import azkaban.utils.TypedMapWrapper;
 
 public class ExecutableFlow extends ExecutableFlowBase {
 	public static final String EXECUTIONID_PARAM = "executionId";
@@ -192,30 +192,20 @@ public class ExecutableFlow extends ExecutableFlowBase {
 	}
 	
 	@Override
-	@SuppressWarnings("unchecked")
-	public void fillExecutableFromMapObject(Map<String, Object> flowObj) {
+	public void fillExecutableFromMapObject(TypedMapWrapper<String, Object> flowObj) {
 		super.fillExecutableFromMapObject(flowObj);
 		
-		this.executionId = (Integer)flowObj.get(EXECUTIONID_PARAM);
-		this.executionPath = (String)flowObj.get(EXECUTIONPATH_PARAM);
-
-		this.projectId = (Integer)flowObj.get(PROJECTID_PARAM);
-		if (flowObj.containsKey(SCHEDULEID_PARAM)) {
-			this.scheduleId = (Integer)flowObj.get(SCHEDULEID_PARAM);
-		}
-		
-		if (flowObj.containsKey(SUBMITUSER_PARAM)) {
-			this.submitUser = (String)flowObj.get(SUBMITUSER_PARAM);
-		}
-		else {
-			this.submitUser = null;
-		}
-		this.version = (Integer)flowObj.get(VERSION_PARAM);
-		
-		this.submitTime = JSONUtils.getLongFromObject(flowObj.get(SUBMITTIME_PARAM));
+		this.executionId = flowObj.getInt(EXECUTIONID_PARAM);
+		this.executionPath = flowObj.getString(EXECUTIONPATH_PARAM);
+
+		this.projectId = flowObj.getInt(PROJECTID_PARAM);
+		this.scheduleId = flowObj.getInt(SCHEDULEID_PARAM);
+		this.submitUser = flowObj.getString(SUBMITUSER_PARAM);
+		this.version = flowObj.getInt(VERSION_PARAM);
+		this.submitTime = flowObj.getLong(SUBMITTIME_PARAM);
 		
 		if (flowObj.containsKey(EXECUTIONOPTIONS_PARAM)) {
-			this.executionOptions = ExecutionOptions.createFromObject(flowObj.get(EXECUTIONOPTIONS_PARAM));
+			this.executionOptions = ExecutionOptions.createFromObject(flowObj.getObject(EXECUTIONOPTIONS_PARAM));
 		}
 		else {
 			// for backwards compatibility should remove in a few versions.
@@ -223,7 +213,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
 		}
 		
 		if(flowObj.containsKey(PROXYUSERS_PARAM)) {
-			ArrayList<String> proxyUserList = (ArrayList<String>) flowObj.get(PROXYUSERS_PARAM);
+			List<String> proxyUserList = flowObj.<String>getList(PROXYUSERS_PARAM);
 			this.addAllProxyUsers(proxyUserList);
 		}
 	}
diff --git a/src/java/azkaban/executor/ExecutableFlowBase.java b/src/java/azkaban/executor/ExecutableFlowBase.java
index 798f40a..040888b 100644
--- a/src/java/azkaban/executor/ExecutableFlowBase.java
+++ b/src/java/azkaban/executor/ExecutableFlowBase.java
@@ -27,6 +27,7 @@ import azkaban.flow.FlowProps;
 import azkaban.flow.Node;
 import azkaban.flow.SpecialJobTypes;
 import azkaban.project.Project;
+import azkaban.utils.TypedMapWrapper;
 
 public class ExecutableFlowBase extends ExecutableNode {
 	public static final String FLOW_ID_PARAM = "flowId";
@@ -206,24 +207,23 @@ public class ExecutableFlowBase extends ExecutableNode {
 		flowObjMap.put(PROPERTIES_PARAM, props);
 	}
 
-	/**
-	 * Using the parameters in the map created from a json file, fill the results of this node
-	 */
-	@SuppressWarnings("unchecked")
-	public void fillExecutableFromMapObject(Map<String,Object> flowObjMap) {
+	@Override
+	public void fillExecutableFromMapObject(TypedMapWrapper<String,Object> flowObjMap) {
 		super.fillExecutableFromMapObject(flowObjMap);
 		
-		this.flowId = (String)flowObjMap.get(FLOW_ID_PARAM);
+		this.flowId = flowObjMap.getString(FLOW_ID_PARAM);
+		List<Object> nodes = flowObjMap.<Object>getList(NODES_PARAM);
 		
-		List<Object> nodes = (List<Object>)flowObjMap.get(NODES_PARAM);
 		if (nodes != null) {
 			for (Object nodeObj: nodes) {
+				@SuppressWarnings("unchecked")
 				Map<String,Object> nodeObjMap = (Map<String,Object>)nodeObj;
+				TypedMapWrapper<String,Object> wrapper = new TypedMapWrapper<String,Object>(nodeObjMap);
 				
-				String type = (String)nodeObjMap.get(TYPE_PARAM);
-				if (type.equals(SpecialJobTypes.EMBEDDED_FLOW_TYPE)) {
+				String type = wrapper.getString(TYPE_PARAM);
+				if (type != null && type.equals(SpecialJobTypes.EMBEDDED_FLOW_TYPE)) {
 					ExecutableFlowBase exFlow = new ExecutableFlowBase();
-					exFlow.fillExecutableFromMapObject(nodeObjMap);
+					exFlow.fillExecutableFromMapObject(wrapper);
 					exFlow.setParentFlow(this);
 					
 					executableNodes.put(exFlow.getId(), exFlow);
@@ -238,8 +238,9 @@ public class ExecutableFlowBase extends ExecutableNode {
 			}
 		}
 		
-		List<Object> properties = (List<Object>)flowObjMap.get(PROPERTIES_PARAM);
+		List<Object> properties = flowObjMap.<Object>getList(PROPERTIES_PARAM);
 		for (Object propNode : properties) {
+			@SuppressWarnings("unchecked")
 			HashMap<String, Object> fprop = (HashMap<String, Object>)propNode;
 			String source = (String)fprop.get("source");
 			String inheritedSource = (String)fprop.get("inherited");
@@ -278,22 +279,21 @@ public class ExecutableFlowBase extends ExecutableNode {
 		return updateData;
 	}
 	
-	public void applyUpdateObject(Map<String, Object> updateData, List<ExecutableNode> updatedNodes) {
+	public void applyUpdateObject(TypedMapWrapper<String, Object> updateData, List<ExecutableNode> updatedNodes) {
 		super.applyUpdateObject(updateData);
 		
 		if (updatedNodes != null) {
 			updatedNodes.add(this);
 		}
-		
-		@SuppressWarnings("unchecked")
-		List<Map<String,Object>> nodes = (List<Map<String,Object>>)updateData.get(NODES_PARAM);
+
+		List<Map<String,Object>> nodes = (List<Map<String,Object>>)updateData.<Map<String,Object>>getList(NODES_PARAM);
 		if (nodes != null) {
 			for (Map<String,Object> node: nodes) {
-	
-				String id = (String)node.get(ID_PARAM);
+				TypedMapWrapper<String,Object> nodeWrapper = new TypedMapWrapper<String,Object>(node);
+				String id = nodeWrapper.getString(ID_PARAM);
 				if (id == null) {
 					// Legacy case
-					id = (String)node.get("jobId");				
+					id = nodeWrapper.getString("jobId");				
 				}
 	
 				ExecutableNode exNode = executableNodes.get(id);
@@ -302,18 +302,24 @@ public class ExecutableFlowBase extends ExecutableNode {
 				}
 				
 				if (exNode instanceof ExecutableFlowBase) {
-					((ExecutableFlowBase)exNode).applyUpdateObject(node, updatedNodes);
+					((ExecutableFlowBase)exNode).applyUpdateObject(nodeWrapper, updatedNodes);
 				}
 				else {
-					exNode.applyUpdateObject(node);
+					exNode.applyUpdateObject(nodeWrapper);
 				}
 			}
 		}
-		
 	}
 	
+	public void applyUpdateObject(Map<String, Object> updateData, List<ExecutableNode> updatedNodes) {
+		TypedMapWrapper<String, Object> typedMapWrapper = new TypedMapWrapper<String,Object>(updateData);
+		applyUpdateObject(typedMapWrapper, updatedNodes);
+	}
+	
+	@Override
 	public void applyUpdateObject(Map<String, Object> updateData) {
-		applyUpdateObject(updateData, null);
+		TypedMapWrapper<String, Object> typedMapWrapper = new TypedMapWrapper<String,Object>(updateData);
+		applyUpdateObject(typedMapWrapper, null);
 	}
 	
 	public void reEnableDependents(ExecutableNode ... nodes) {
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 79942e9..0683c38 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -2,15 +2,16 @@ package azkaban.executor;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import azkaban.flow.Node;
-import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
+import azkaban.utils.TypedMapWrapper;
 
 /**
  * Base Executable that nodes and flows are based.
@@ -291,38 +292,30 @@ public class ExecutableNode {
 	}
 	
 	@SuppressWarnings("unchecked")
-	public void fillExecutableFromMapObject(Map<String,Object> objMap) {
-		this.id = (String)objMap.get(ID_PARAM);
-		this.status = Status.valueOf((String)objMap.get(STATUS_PARAM));
-		this.startTime = JSONUtils.getLongFromObject(objMap.get(STARTTIME_PARAM));
-		this.endTime = JSONUtils.getLongFromObject(objMap.get(ENDTIME_PARAM));
-		this.updateTime = JSONUtils.getLongFromObject(objMap.get(UPDATETIME_PARAM));
-		this.type = (String)objMap.get(TYPE_PARAM);
-		this.attempt = (Integer)objMap.get(ATTEMPT_PARAM);
+	public void fillExecutableFromMapObject(TypedMapWrapper<String, Object> wrappedMap) {
+		this.id = wrappedMap.getString(ID_PARAM);
+		this.type = wrappedMap.getString(TYPE_PARAM);
+		this.status = Status.valueOf(wrappedMap.getString(STATUS_PARAM));
+		this.startTime = wrappedMap.getLong(STARTTIME_PARAM);
+		this.endTime = wrappedMap.getLong(ENDTIME_PARAM);
+		this.updateTime = wrappedMap.getLong(UPDATETIME_PARAM);
+		this.attempt = wrappedMap.getInt(ATTEMPT_PARAM, 0);
+
+		this.inNodes = new HashSet<String>();
+		this.inNodes.addAll(wrappedMap.getStringCollection(INNODES_PARAM, Collections.<String>emptySet()));
 		
-		if (objMap.containsKey(INNODES_PARAM)) {
-			this.inNodes = new HashSet<String>();
-			this.inNodes.addAll((Collection<String>)objMap.get(INNODES_PARAM));
-		}
+		this.outNodes = new HashSet<String>();
+		this.outNodes.addAll(wrappedMap.getStringCollection(OUTNODES_PARAM, Collections.<String>emptySet()));
 		
-		if (objMap.containsKey(OUTNODES_PARAM)) {
-			this.outNodes = new HashSet<String>();
-			this.outNodes.addAll((Collection<String>)objMap.get(OUTNODES_PARAM));
-		}
-	
-		if (objMap.containsKey(PROPS_SOURCE_PARAM)) {
-			this.propsSource = (String)objMap.get(PROPS_SOURCE_PARAM);
-		}
+		this.propsSource = wrappedMap.getString(PROPS_SOURCE_PARAM);
+		this.jobSource = wrappedMap.getString(JOB_SOURCE_PARAM);
 		
-		if (objMap.containsKey(JOB_SOURCE_PARAM)) {
-			this.jobSource = (String)objMap.get(JOB_SOURCE_PARAM);
+		Map<String, String> outputProps = wrappedMap.<String,String>getMap(OUTPUT_PROPS_PARAM);
+		if (outputProps != null) {
+			this.outputProps = new Props(null, outputProps);
 		}
 		
-		if (objMap.containsKey(OUTPUT_PROPS_PARAM)) {
-			this.outputProps = new Props(null, (Map<String,String>)objMap.get(OUTPUT_PROPS_PARAM));
-		}
-		
-		Collection<Object> pastAttempts = (Collection<Object>)objMap.get(PASTATTEMPTS_PARAM);
+		Collection<Object> pastAttempts = wrappedMap.<Object>getCollection(PASTATTEMPTS_PARAM);
 		if (pastAttempts!=null) {
 			ArrayList<ExecutionAttempt> attempts = new ArrayList<ExecutionAttempt>();
 			for (Object attemptObj: pastAttempts) {
@@ -334,6 +327,11 @@ public class ExecutableNode {
 		}
 	}
 
+	public void fillExecutableFromMapObject(Map<String,Object> objMap) {
+		TypedMapWrapper<String, Object> wrapper = new TypedMapWrapper<String, Object>(objMap);
+		fillExecutableFromMapObject(wrapper);
+	}
+
 	public Map<String, Object> toUpdateObject() {
 		Map<String, Object> updatedNodeMap = new HashMap<String,Object>();
 		updatedNodeMap.put(ID_PARAM, getId());
@@ -355,28 +353,25 @@ public class ExecutableNode {
 		return updatedNodeMap;
 	}
 	
-	@SuppressWarnings("unchecked")
-	public void applyUpdateObject(Map<String, Object> updateData) {
-		if (updateData.containsKey(STATUS_PARAM)) {
-			this.status = Status.fromInteger((Integer)updateData.get(STATUS_PARAM));
-		}
-		if (updateData.containsKey(STARTTIME_PARAM)) {
-			this.startTime = JSONUtils.getLongFromObject(updateData.get(STARTTIME_PARAM));
-		}
-		if (updateData.containsKey(UPDATETIME_PARAM)) {
-			this.updateTime = JSONUtils.getLongFromObject(updateData.get(UPDATETIME_PARAM));
-		}
-		if (updateData.containsKey(ENDTIME_PARAM)) {
-			this.endTime = JSONUtils.getLongFromObject(updateData.get(ENDTIME_PARAM));
-		}
+	public void applyUpdateObject(TypedMapWrapper<String, Object> updateData) {
+		this.status = Status.fromInteger(updateData.getInt(STATUS_PARAM, this.status.getNumVal()));
+		this.startTime = updateData.getLong(STARTTIME_PARAM);
+		this.updateTime = updateData.getLong(UPDATETIME_PARAM);
+		this.endTime = updateData.getLong(ENDTIME_PARAM);
 		
 		if (updateData.containsKey(ATTEMPT_PARAM)) {
-			attempt = (Integer)updateData.get(ATTEMPT_PARAM);
+			attempt = updateData.getInt(ATTEMPT_PARAM);
 			if (attempt > 0) {
-				updatePastAttempts((List<Object>)updateData.get(PASTATTEMPTS_PARAM));
+				updatePastAttempts(
+						updateData.<Object>getList(PASTATTEMPTS_PARAM, Collections.<Object>emptyList()));
 			}
 		}
 	}
+
+	public void applyUpdateObject(Map<String, Object> updateData) {
+		TypedMapWrapper<String,Object> wrapper = new TypedMapWrapper<String,Object>(updateData);
+		applyUpdateObject(wrapper);
+	}
 	
 	public void killNode(long killTime) {
 		if (this.status == Status.DISABLED) {
@@ -417,4 +412,6 @@ public class ExecutableNode {
 			}
 		}
 	}
+	
+	
 }
diff --git a/src/java/azkaban/executor/ExecutionAttempt.java b/src/java/azkaban/executor/ExecutionAttempt.java
index 7da0623..7712010 100644
--- a/src/java/azkaban/executor/ExecutionAttempt.java
+++ b/src/java/azkaban/executor/ExecutionAttempt.java
@@ -3,7 +3,7 @@ package azkaban.executor;
 import java.util.HashMap;
 import java.util.Map;
 
-import azkaban.utils.JSONUtils;
+import azkaban.utils.TypedMapWrapper;
 
 public class ExecutionAttempt {
 	public static final String ATTEMPT_PARAM = "attempt";
@@ -49,10 +49,11 @@ public class ExecutionAttempt {
 	public static ExecutionAttempt fromObject(Object obj) {
 		@SuppressWarnings("unchecked")
 		Map<String, Object> map = (Map<String, Object>)obj;
-		int attempt = (Integer)map.get(ATTEMPT_PARAM);
-		long startTime = JSONUtils.getLongFromObject(map.get(STARTTIME_PARAM));
-		long endTime = JSONUtils.getLongFromObject(map.get(ENDTIME_PARAM));
-		Status status = Status.valueOf((String)map.get(STATUS_PARAM));
+		TypedMapWrapper<String, Object> wrapper = new TypedMapWrapper<String, Object>(map);
+		int attempt = wrapper.getInt(ATTEMPT_PARAM);
+		long startTime = wrapper.getLong(STARTTIME_PARAM);
+		long endTime = wrapper.getLong(ENDTIME_PARAM);
+		Status status = Status.valueOf(wrapper.getString(STATUS_PARAM));
 		
 		return new ExecutionAttempt(attempt, startTime, endTime, status);
 	}
diff --git a/src/java/azkaban/executor/ExecutionOptions.java b/src/java/azkaban/executor/ExecutionOptions.java
index cedc3f0..0148ab3 100644
--- a/src/java/azkaban/executor/ExecutionOptions.java
+++ b/src/java/azkaban/executor/ExecutionOptions.java
@@ -2,12 +2,15 @@ package azkaban.executor;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import azkaban.utils.TypedMapWrapper;
+
 /**
  * Execution options for submitted flows and scheduled flows
  */
@@ -179,50 +182,35 @@ public class ExecutionOptions {
 		}
 		
 		Map<String,Object> optionsMap = (Map<String,Object>)obj;
+		TypedMapWrapper<String,Object> wrapper = new TypedMapWrapper<String,Object>(optionsMap);
 		
 		ExecutionOptions options = new ExecutionOptions();
 		if (optionsMap.containsKey(FLOW_PARAMETERS)) {
 			options.flowParameters = new HashMap<String, String>();
-			options.flowParameters.putAll((Map<String,String>)optionsMap.get(FLOW_PARAMETERS));
+			options.flowParameters.putAll(wrapper.<String,String>getMap(FLOW_PARAMETERS));
 		}
 		// Failure notification
-		if (optionsMap.containsKey(NOTIFY_ON_FIRST_FAILURE)) {
-			options.notifyOnFirstFailure = (Boolean)optionsMap.get(NOTIFY_ON_FIRST_FAILURE);
-		}
-		if (optionsMap.containsKey(NOTIFY_ON_LAST_FAILURE)) {
-			options.notifyOnLastFailure = (Boolean)optionsMap.get(NOTIFY_ON_LAST_FAILURE);
-		}
-		if (optionsMap.containsKey(CONCURRENT_OPTION)) {
-			options.concurrentOption = (String)optionsMap.get(CONCURRENT_OPTION);
-		}
-		if (optionsMap.containsKey(DISABLE)) {
-			options.initiallyDisabledJobs = new HashSet<String>((Collection<String>)optionsMap.get(DISABLE));
+		options.notifyOnFirstFailure = wrapper.getBool(NOTIFY_ON_FIRST_FAILURE, options.notifyOnFirstFailure);
+		options.notifyOnLastFailure = wrapper.getBool(NOTIFY_ON_LAST_FAILURE, options.notifyOnLastFailure);
+		options.concurrentOption = wrapper.getString(CONCURRENT_OPTION, options.concurrentOption);
+		
+		if (wrapper.containsKey(DISABLE)) {
+			options.initiallyDisabledJobs = new HashSet<String>(wrapper.<String>getCollection(DISABLE));
 		}
 		
 		// Failure action
-		if (optionsMap.containsKey(FAILURE_ACTION)) {
-			options.failureAction = FailureAction.valueOf((String)optionsMap.get(FAILURE_ACTION));
-		}
-		options.pipelineLevel = (Integer)optionsMap.get(PIPELINE_LEVEL);
-		options.pipelineExecId = (Integer)optionsMap.get(PIPELINE_EXECID);
-		options.queueLevel = (Integer)optionsMap.get(QUEUE_LEVEL);
+		options.failureAction = FailureAction.valueOf(wrapper.getString(FAILURE_ACTION, options.failureAction.toString()));
+		options.pipelineLevel = wrapper.getInt(PIPELINE_LEVEL, options.pipelineLevel);
+		options.pipelineExecId = wrapper.getInt(PIPELINE_EXECID, options.pipelineExecId);
+		options.queueLevel = wrapper.getInt(QUEUE_LEVEL, options.queueLevel);
+
 		
 		// Success emails
-		if (optionsMap.containsKey(SUCCESS_EMAILS)) {
-			options.setSuccessEmails((List<String>)optionsMap.get(SUCCESS_EMAILS));
-		}
-		// Failure emails
-		if (optionsMap.containsKey(FAILURE_EMAILS)) {
-			options.setFailureEmails((List<String>)optionsMap.get(FAILURE_EMAILS));
-		}
+		options.setSuccessEmails(wrapper.<String>getList(SUCCESS_EMAILS, Collections.<String>emptyList()));
+		options.setFailureEmails(wrapper.<String>getList(FAILURE_EMAILS, Collections.<String>emptyList()));
 		
-		if (optionsMap.containsKey(SUCCESS_EMAILS_OVERRIDE)) {
-			options.setSuccessEmailsOverridden((Boolean)optionsMap.get(SUCCESS_EMAILS_OVERRIDE));
-		}
-		
-		if (optionsMap.containsKey(FAILURE_EMAILS_OVERRIDE)) {
-			options.setFailureEmailsOverridden((Boolean)optionsMap.get(FAILURE_EMAILS_OVERRIDE));
-		}
+		options.setSuccessEmailsOverridden(wrapper.getBool(SUCCESS_EMAILS_OVERRIDE, false));
+		options.setFailureEmailsOverridden(wrapper.getBool(FAILURE_EMAILS_OVERRIDE, false));
 		
 		return options;
 	}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 29a93a3..0faf4ab 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -826,7 +826,7 @@ public class ExecutorManager {
 		
 		Pair<ExecutionReference, ExecutableFlow> refPair = this.runningFlows.get(execId);
 		if (refPair == null) {
-			throw new ExecutorManagerException("No running flow found with the execution id.");
+			throw new ExecutorManagerException("No running flow found with the execution id. Removing " + execId);
 		}
 		
 		ExecutionReference ref = refPair.getFirst();
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index d354e65..d427ea5 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -379,7 +379,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
 		
 		// if the main flow is not the parent, then we'll create a composite key for flowID
 		if (flow != node.getParentFlow()) {
-			flowId = flow.getId() + "+" + node.getParentFlow().getPrintableId("+");
+			flowId = node.getParentFlow().getNestedId();
 		}
 		
 		QueryRunner runner = createQueryRunner();
@@ -426,6 +426,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
 					node.getStatus().getNumVal(), 
 					outputParam,
 					node.getExecutableFlow().getExecutionId(),
+					node.getParentFlow().getNestedId(),
 					node.getId(),
 					node.getAttempt());
 		} catch (SQLException e) {
diff --git a/src/java/azkaban/utils/TypedMapWrapper.java b/src/java/azkaban/utils/TypedMapWrapper.java
new file mode 100644
index 0000000..cce512b
--- /dev/null
+++ b/src/java/azkaban/utils/TypedMapWrapper.java
@@ -0,0 +1,141 @@
+package azkaban.utils;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+public class TypedMapWrapper<K, V> {
+	private Map<K,V> map;
+	public TypedMapWrapper(Map<K, V> map) {
+		this.map = map;
+	}
+	
+	public String getString(K key) {
+		return getString(key, null);
+	}
+	
+	public String getString(K key, String defaultVal) {
+		Object obj = map.get(key);
+		if (obj == null) {
+			return defaultVal;
+		}
+		if (obj instanceof String) {
+			return (String)obj;
+		}
+	
+		return obj.toString();
+	}
+	
+	public Boolean getBool(K key, Boolean defaultVal) {
+		Object obj = map.get(key);
+		if (obj == null) {
+			return defaultVal;
+		}
+		
+		return (Boolean)obj;
+	}
+	
+	public Integer getInt(K key) {
+		return getInt(key, -1);
+	}
+	
+	public Integer getInt(K key, Integer defaultVal) {
+		Object obj = map.get(key);
+		if (obj == null) {
+			return defaultVal;
+		}
+		if (obj instanceof Integer) {
+			return (Integer)obj;
+		}
+		else if (obj instanceof String) {
+			return Integer.valueOf((String)obj);
+		}
+		else {
+			return defaultVal;
+		}
+	}
+	
+	public Long getLong(K key) {
+		return getLong(key, -1l);
+	}
+	
+	public Long getLong(K key, Long defaultVal) {
+		Object obj = map.get(key);
+		if (obj == null) {
+			return defaultVal;
+		}
+		if (obj instanceof Long) {
+			return (Long)obj;
+		}
+		else if (obj instanceof Integer) {
+			return Long.valueOf((Integer)obj);
+		}
+		else if (obj instanceof String) {
+			return Long.valueOf((String)obj);
+		}
+		else {
+			return defaultVal;
+		}
+	}
+	
+	@SuppressWarnings("unchecked")
+	public Collection<String> getStringCollection(K key) {
+		Object obj = map.get(key);
+		return (Collection<String>)obj;
+	}
+	
+	@SuppressWarnings("unchecked")
+	public Collection<String> getStringCollection(K key, Collection<String> defaultVal) {
+		Object obj = map.get(key);
+		if (obj == null) {
+			return defaultVal;
+		}
+		
+		return (Collection<String>)obj;
+	}
+	
+	
+	@SuppressWarnings("unchecked")
+	public <C> Collection<C> getCollection(K key) {
+		Object obj = map.get(key);
+		if (obj instanceof Collection) {
+			return (Collection<C>)obj;
+		}
+		return null;
+	}
+	
+	@SuppressWarnings("unchecked")
+	public <L> List<L> getList(K key) {
+		Object obj = map.get(key);
+		if (obj instanceof List) {
+			return (List<L>)obj;
+		}
+		return null;
+	}
+	
+	@SuppressWarnings("unchecked")
+	public <L> List<L> getList(K key, List<L> defaultVal) {
+		Object obj = map.get(key);
+		if (obj instanceof List) {
+			return (List<L>)obj;
+		}
+		return defaultVal;
+	}
+	
+	public Object getObject(K key) {
+		return map.get(key);
+	}
+	
+	public Map<K, V> getMap() {
+		return map;
+	}
+	
+	@SuppressWarnings("unchecked")
+	public <S, T> Map<S,T> getMap(K key) {
+		return (Map<S,T>)map.get(key);
+	}
+	
+	public boolean containsKey(K key) {
+		return map.containsKey(key);
+	}
+}
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 4cf171a..785adbf 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
@@ -28,6 +29,7 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutionAttempt;
 import azkaban.executor.ExecutionOptions;
@@ -690,51 +692,58 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 			ret.put("resume", e.getMessage());
 		}
 	}
-
-	private void ajaxFetchExecutableFlowUpdate(HttpServletRequest req,
-			HttpServletResponse resp, HashMap<String, Object> ret, User user,
-			ExecutableFlow exFlow) throws ServletException {
-		Long lastUpdateTime = Long.parseLong(getParam(req, "lastUpdateTime"));
-		System.out.println("Fetching " + exFlow.getExecutionId());
-
-		Project project = getProjectAjaxByPermission(ret,
-				exFlow.getProjectId(), user, Type.READ);
-		if (project == null) {
-			return;
-		}
-
+	
+	private long fillUpdateExecutableFlowInfo(ExecutableFlowBase flow, long lastUpdateTime, HashMap<String, Object> ret) {
 		// Just update the nodes and flow states
 		ArrayList<Map<String, Object>> nodeList = new ArrayList<Map<String, Object>>();
-		for (ExecutableNode node : exFlow.getExecutableNodes()) {
-			if (node.getUpdateTime() <= lastUpdateTime) {
+		HashMap<String, Map<String,Object>> nodeMap = new HashMap<String, Map<String,Object>>();
+		
+		long updateTime = flow.getUpdateTime();
+		for (ExecutableNode node : flow.getExecutableNodes()) {
+			HashMap<String, Object> nodeObj = null;
+			if (node instanceof ExecutableFlowBase) {
+				nodeObj = new HashMap<String, Object>();
+				long subUpdateTime = fillUpdateExecutableFlowInfo((ExecutableFlowBase)node, lastUpdateTime, nodeObj);
+				updateTime = Math.max(updateTime, subUpdateTime);
+				if (updateTime <= lastUpdateTime) {
+					continue;
+				}
+			}
+			else if (node.getUpdateTime() <= lastUpdateTime){
 				continue;
 			}
-
-			HashMap<String, Object> nodeObj = new HashMap<String, Object>();
-			nodeObj.put("id", node.getId());
-			nodeObj.put("status", node.getStatus());
-			nodeObj.put("startTime", node.getStartTime());
-			nodeObj.put("endTime", node.getEndTime());
-			nodeObj.put("attempt", node.getAttempt());
-
-			if (node.getAttempt() > 0) {
-				nodeObj.put("pastAttempts", node.getAttemptObjects());
+			else {
+				nodeObj = new HashMap<String, Object>();
+				updateTime = Math.max(updateTime, node.getUpdateTime());
+	
+				nodeObj.put("id", node.getId());
+				nodeObj.put("status", node.getStatus());
+				nodeObj.put("startTime", node.getStartTime());
+				nodeObj.put("endTime", node.getEndTime());
+				nodeObj.put("updateTime", node.getUpdateTime());
+				nodeObj.put("attempt", node.getAttempt());
+	
+				if (node.getAttempt() > 0) {
+					nodeObj.put("pastAttempts", node.getAttemptObjects());
+				}
 			}
-
+			
+			nodeMap.put(node.getId(), nodeObj);
 			nodeList.add(nodeObj);
 		}
 
 		ret.put("nodes", nodeList);
-		ret.put("status", exFlow.getStatus().toString());
-		ret.put("startTime", exFlow.getStartTime());
-		ret.put("endTime", exFlow.getEndTime());
-		ret.put("submitTime", exFlow.getSubmitTime());
-		ret.put("updateTime", exFlow.getUpdateTime());
+		ret.put("status", flow.getStatus().toString());
+		ret.put("startTime", flow.getStartTime());
+		ret.put("endTime", flow.getEndTime());
+		ret.put("updateTime", updateTime);
+		return updateTime;
 	}
-
-	private void ajaxFetchExecutableFlow(HttpServletRequest req,
+	
+	private void ajaxFetchExecutableFlowUpdate(HttpServletRequest req,
 			HttpServletResponse resp, HashMap<String, Object> ret, User user,
 			ExecutableFlow exFlow) throws ServletException {
+		Long lastUpdateTime = Long.parseLong(getParam(req, "lastUpdateTime"));
 		System.out.println("Fetching " + exFlow.getExecutionId());
 
 		Project project = getProjectAjaxByPermission(ret,
@@ -742,16 +751,27 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		if (project == null) {
 			return;
 		}
+		
+		fillUpdateExecutableFlowInfo(exFlow, lastUpdateTime, ret);
+	}
 
+	private long fillExecutableFlowInfo(ExecutableFlowBase flow, HashMap<String, Object> ret) {
+		long updateTime = flow.getUpdateTime();
+		
 		ArrayList<Map<String, Object>> nodeList = new ArrayList<Map<String, Object>>();
 		ArrayList<Map<String, Object>> edgeList = new ArrayList<Map<String, Object>>();
-		for (ExecutableNode node : exFlow.getExecutableNodes()) {
+		
+		ArrayList<String> executorQueue = new ArrayList<String>();
+		executorQueue.addAll(flow.getStartNodes());
+
+		for (ExecutableNode node : flow.getExecutableNodes()) {
 			HashMap<String, Object> nodeObj = new HashMap<String, Object>();
 			nodeObj.put("id", node.getId());
 			nodeObj.put("status", node.getStatus());
 			nodeObj.put("startTime", node.getStartTime());
 			nodeObj.put("endTime", node.getEndTime());
-
+			nodeObj.put("type", node.getType());
+			
 			// Add past attempts
 			if (node.getPastAttemptList() != null) {
 				ArrayList<Object> pastAttempts = new ArrayList<Object>();
@@ -760,9 +780,9 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 				}
 				nodeObj.put("pastAttempts", pastAttempts);
 			}
-
+	
 			nodeList.add(nodeObj);
-
+	
 			// Add edges
 			for (String out : node.getOutNodes()) {
 				HashMap<String, Object> edgeObj = new HashMap<String, Object>();
@@ -770,15 +790,84 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 				edgeObj.put("target", out);
 				edgeList.add(edgeObj);
 			}
+			
+			// If it's an embedded flow, add the embedded flow info
+			if (node instanceof ExecutableFlowBase) {
+				long subUpdateTime = fillExecutableFlowInfo((ExecutableFlowBase)node, nodeObj);
+				updateTime = Math.max(updateTime, subUpdateTime);
+			}
+			else {
+				nodeObj.put("updateTime", updateTime);
+			}
 		}
-
+		
 		ret.put("nodes", nodeList);
 		ret.put("edges", edgeList);
-		ret.put("status", exFlow.getStatus().toString());
-		ret.put("startTime", exFlow.getStartTime());
-		ret.put("endTime", exFlow.getEndTime());
+		ret.put("status", flow.getStatus().toString());
+		ret.put("startTime", flow.getStartTime());
+		ret.put("endTime", flow.getEndTime());
+		ret.put("updateTime", updateTime);
+		return updateTime;
+	}
+	
+	private void ajaxFetchExecutableFlow(HttpServletRequest req,
+			HttpServletResponse resp, HashMap<String, Object> ret, User user,
+			ExecutableFlow exFlow) throws ServletException {
+		System.out.println("Fetching " + exFlow.getExecutionId());
+
+		Project project = getProjectAjaxByPermission(ret,
+				exFlow.getProjectId(), user, Type.READ);
+		if (project == null) {
+			return;
+		}
+
+		fillExecutableFlowInfo(exFlow, ret);
 		ret.put("submitTime", exFlow.getSubmitTime());
 		ret.put("submitUser", exFlow.getSubmitUser());
+//		
+//		
+//		ArrayList<Map<String, Object>> nodeList = new ArrayList<Map<String, Object>>();
+//		ArrayList<Map<String, Object>> edgeList = new ArrayList<Map<String, Object>>();
+//		for (ExecutableNode node : exFlow.getExecutableNodes()) {
+//			HashMap<String, Object> nodeObj = new HashMap<String, Object>();
+//			nodeObj.put("id", node.getId());
+//			nodeObj.put("status", node.getStatus());
+//			nodeObj.put("startTime", node.getStartTime());
+//			nodeObj.put("endTime", node.getEndTime());
+//			nodeObj.put("type", node.getType());
+//			
+//			// Add past attempts
+//			if (node.getPastAttemptList() != null) {
+//				ArrayList<Object> pastAttempts = new ArrayList<Object>();
+//				for (ExecutionAttempt attempt : node.getPastAttemptList()) {
+//					pastAttempts.add(attempt.toObject());
+//				}
+//				nodeObj.put("pastAttempts", pastAttempts);
+//			}
+//
+//			nodeList.add(nodeObj);
+//
+//			// Add edges
+//			for (String out : node.getOutNodes()) {
+//				HashMap<String, Object> edgeObj = new HashMap<String, Object>();
+//				edgeObj.put("from", node.getId());
+//				edgeObj.put("target", out);
+//				edgeList.add(edgeObj);
+//			}
+//			
+//			// If it's an embedded flow, add the embedded flow info
+//			if (node instanceof ExecutableFlowBase) {
+//				
+//			}
+//		}
+//
+//		ret.put("nodes", nodeList);
+//		ret.put("edges", edgeList);
+//		ret.put("status", exFlow.getStatus().toString());
+//		ret.put("startTime", exFlow.getStartTime());
+//		ret.put("endTime", exFlow.getEndTime());
+//		ret.put("submitTime", exFlow.getSubmitTime());
+//		ret.put("submitUser", exFlow.getSubmitUser());
 	}
 
 	private void ajaxAttemptExecuteFlow(HttpServletRequest req,
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 4abde57..2a4458d 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -28,6 +28,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -511,7 +512,46 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 	private void ajaxFetchFlowGraph(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
 		String flowId = getParam(req, "flow");
 		
-		fillFlowInfo(project, flowId, ret);
+		fillFlowInfo2(project, flowId, ret);
+	}
+	
+	private void fillFlowInfo2(Project project, String flowId, HashMap<String, Object> ret) {
+		Flow flow = project.getFlow(flowId);
+		
+		ArrayList<Map<String, Object>> nodeList = new ArrayList<Map<String, Object>>();
+		for (Node node: flow.getNodes()) {
+			HashMap<String, Object> nodeObj = new HashMap<String, Object>();
+			nodeObj.put("id", node.getId());
+			nodeObj.put("type", node.getType());
+			if (node.getEmbeddedFlowId() != null) {
+				nodeObj.put("flowId", node.getEmbeddedFlowId());
+//				HashMap<String, Object> embeddedNodeObj = new HashMap<String, Object>();
+//				fillFlowInfo2(project, node.getEmbeddedFlowId(), embeddedNodeObj);
+//				nodeObj.put("flowData", embeddedNodeObj);
+			}
+			
+			nodeList.add(nodeObj);
+			Set<Edge> inEdges = flow.getInEdges(node.getId());
+			if (inEdges != null && !inEdges.isEmpty()) {
+				ArrayList<String> inEdgesList = new ArrayList<String>();
+				for (Edge edge: inEdges) {
+					inEdgesList.add(edge.getSourceId());
+				}
+				Collections.sort(inEdgesList);
+				nodeObj.put("in", inEdgesList);
+			}
+		}
+		
+		Collections.sort(nodeList, new Comparator<Map<String, Object>>() {
+			@Override
+			public int compare(Map<String, Object> o1, Map<String, Object> o2) {
+				String id = (String)o1.get("id");
+				return id.compareTo((String)o2.get("id"));
+			}
+		});
+
+		ret.put("flow", flowId);
+		ret.put("nodes", nodeList);
 	}
 	
 	private void fillFlowInfo(Project project, String flowId, HashMap<String, Object> ret) {
@@ -584,7 +624,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 		if (node.getType().equals("flow")) {
 			if (node.getEmbeddedFlowId() != null) {
 				HashMap<String, Object> flowMap = new HashMap<String, Object>();
-				fillFlowInfo(project, node.getEmbeddedFlowId(), flowMap);
+				fillFlowInfo2(project, node.getEmbeddedFlowId(), flowMap);
 				ret.put("flowData", flowMap);
 			}
 		}
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
index 7a03448..0bca980 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -31,6 +31,7 @@
 		<script type="text/javascript" src="${context}/js/azkaban.job.status.utils.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.layout.js"></script>
+		<script type="text/javascript" src="${context}/js/azkaban.svg.exflow.helper.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.flow.job.view.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.svg.graph.view.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.exflow.view.js"></script>
diff --git a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
index 82101cb..67cde66 100644
--- a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
@@ -31,7 +31,7 @@
 		<script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.layout.js"></script>
 		<script type="text/javascript" src="${context}/js/svgNavigate.js"></script>
-		<script type="text/javascript" src="${context}/js/azkaban.svg.graph.helper.js"></script>
+		<script type="text/javascript" src="${context}/js/azkaban.svg.flow.loader.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.svg.graph.view.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.flow.extended.view.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.flow.job.view.js"></script>
diff --git a/src/java/azkaban/webapp/servlet/velocity/projectpage.vm b/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
index 8a608f6..c57c2f7 100644
--- a/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
@@ -27,6 +27,7 @@
 		<script type="text/javascript" src="${context}/js/jquery.simplemodal-1.4.4.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.ajax.utils.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
+		<script type="text/javascript" src="${context}/js/azkaban.svg.graph.view.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.project.view.js"></script>
 
 		<link rel="stylesheet" type="text/css" href="${context}/css/jquery-ui-1.10.1.custom.css" />
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index 388c42e..8e3c8d0 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -627,54 +627,6 @@ var logUpdaterFunction = function() {
 	}
 }
 
-var exNodeClickCallback = function(event) {
-	console.log("Node clicked callback");
-	var jobId = event.currentTarget.jobid;
-	var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowId + "&job=" + jobId;
-
-	var menu = [	
-			{title: "Open Job...", callback: function() {window.location.href=requestURL;}},
-			{title: "Open Job in New Window...", callback: function() {window.open(requestURL);}},
-			{break: 1},
-			{title: "Center Job", callback: function() {graphModel.trigger("centerNode", jobId)}}
-	];
-
-	contextMenuView.show(event, menu);
-}
-
-var exJobClickCallback = function(event) {
-	console.log("Node clicked callback");
-	var jobId = event.currentTarget.jobid;
-	var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowId + "&job=" + jobId;
-
-	var menu = [	
-			{title: "Open Job...", callback: function() {window.location.href=requestURL;}},
-			{title: "Open Job in New Window...", callback: function() {window.open(requestURL);}},
-			{break: 1},
-			{title: "Center Job", callback: function() {graphModel.trigger("centerNode", jobId)}}
-	];
-
-	contextMenuView.show(event, menu);
-}
-
-var exEdgeClickCallback = function(event) {
-	console.log("Edge clicked callback");
-}
-
-var exGraphClickCallback = function(event) {
-	console.log("Graph clicked callback");
-	var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowId;
-
-	var menu = [	
-		{title: "Open Flow...", callback: function() {window.location.href=requestURL;}},
-		{title: "Open Flow in New Window...", callback: function() {window.open(requestURL);}},
-		{break: 1},
-		{title: "Center Graph", callback: function() {graphModel.trigger("resetPanZoom");}}
-	];
-	
-	contextMenuView.show(event, menu);
-}
-
 var attemptRightClick = function(event) {
 	var target = event.currentTarget;
 	var job = target.job;
@@ -699,8 +651,8 @@ $(function() {
 	logModel = new azkaban.LogModel();
 	
 	flowTabView = new azkaban.FlowTabView({el:$( '#headertabs'), model: graphModel});
-	mainSvgGraphView = new azkaban.SvgGraphView({el:$('#svgDiv'), model: graphModel, rightClick:  { "node": exNodeClickCallback, "edge": exEdgeClickCallback, "graph": exGraphClickCallback }});
-	jobsListView = new azkaban.JobListView({el:$('#jobList'), model: graphModel, contextMenuCallback: exJobClickCallback});
+	mainSvgGraphView = new azkaban.SvgGraphView({el:$('#svgDiv'), model: graphModel, rightClick:  { "node": nodeClickCallback, "edge": edgeClickCallback, "graph": graphClickCallback }});
+	jobsListView = new azkaban.JobListView({el:$('#jobList'), model: graphModel, contextMenuCallback: nodeClickCallback});
 	flowLogView = new azkaban.FlowLogView({el:$('#flowLogView'), model: logModel});
 	statusView = new azkaban.StatusView({el:$('#flow-status'), model: graphModel});
 	
@@ -712,40 +664,11 @@ $(function() {
 	      requestURL,
 	      {"execid": execId, "ajax":"fetchexecflow"},
 	      function(data) {
-	          console.log("data fetched");
-	          graphModel.set({data: data});
-	          graphModel.set({disabled: {}});
+	    	  console.log("data fetched");
+	    	  createModelFromAjaxCall(data, graphModel);
 	          graphModel.trigger("change:graph");
-	          
-	          updateTime = Math.max(updateTime, data.submitTime);
-	          updateTime = Math.max(updateTime, data.startTime);
-	          updateTime = Math.max(updateTime, data.endTime);
-	          
-	          var nodeMap = {};
-	          for (var i = 0; i < data.nodes.length; ++i) {
-	             var node = data.nodes[i];
-	             nodeMap[node.id] = node;
-	             updateTime = Math.max(updateTime, node.startTime);
-	             updateTime = Math.max(updateTime, node.endTime);
-	          }
-	          for (var i = 0; i < data.edges.length; ++i) {
-	          	 var edge = data.edges[i];
-	          	 
-	          	 if (!nodeMap[edge.target].in) {
-	          	 	nodeMap[edge.target].in = {};
-	          	 }
-	          	 var targetInMap = nodeMap[edge.target].in;
-	          	 targetInMap[edge.from] = nodeMap[edge.from];
-	          	 
-	          	 if (!nodeMap[edge.from].out) {
-	          	 	nodeMap[edge.from].out = {};
-	          	 }
-	          	 var sourceOutMap = nodeMap[edge.from].out;
-	          	 sourceOutMap[edge.target] = nodeMap[edge.target];
-	          }
-	          
-	          graphModel.set({nodeMap: nodeMap});
-	          
+	          updateTime = data.updateTime;
+
 	          if (window.location.hash) {
 					var hash = window.location.hash;
 					if (hash == "#jobslist") {
diff --git a/src/web/js/azkaban.flow.extended.view.js b/src/web/js/azkaban.flow.extended.view.js
index 21985b3..aa38d34 100644
--- a/src/web/js/azkaban.flow.extended.view.js
+++ b/src/web/js/azkaban.flow.extended.view.js
@@ -6,9 +6,7 @@ azkaban.FlowExtendedViewPanel = Backbone.View.extend({
 		//this.model.bind('change:flowinfo', this.changeFlowInfo, this);
 		$(this.el).show();
 		$(this.el).draggable({cancel: ".dataContent", containment: "document"});
-		
-		this.extendedViewPanels = {};
-		this.extendedDataModels = {};
+
 		this.render();
 		$(this.el).hide();
 	},
@@ -52,10 +50,7 @@ azkaban.FlowExtendedViewPanel = Backbone.View.extend({
 			$(svgDataFlow).append(svgGraph);
 			$(svgDataFlow).resizable();
 			
-			this.innerGraphModel = new azkaban.GraphModel();
-			this.innerGraphModel.set({"data": this.model.get("flow")});
-			
-			this.graphView = new azkaban.SvgGraphView({el: svgDataFlow, model: this.innerGraphModel, render: true, rightClick:  { "node": nodeClickCallback, "graph": graphClickCallback }})
+			this.graphView = new azkaban.SvgGraphView({el: svgDataFlow, model: this.model, render: true, rightClick:  { "node": nodeClickCallback, "graph": graphClickCallback }})
 		}
 		else {
 			$(this.el).find(".dataFlow").hide();
diff --git a/src/web/js/azkaban.flow.view.js b/src/web/js/azkaban.flow.view.js
index 73945f0..010291a 100644
--- a/src/web/js/azkaban.flow.view.js
+++ b/src/web/js/azkaban.flow.view.js
@@ -298,6 +298,7 @@ $(function() {
 
 	// Set up the Flow options view. Create a new one every time :p
 	 $('#executebtn').click( function() {
+		 closeAllSubDisplays();
 	  	var data = graphModel.get("data");
 	  	var nodes = data.nodes;
 	  
@@ -314,7 +315,7 @@ $(function() {
 	      requestURL,
 	      {"project": projectName, "ajax":"fetchflowgraph", "flow":flowId},
 	      function(data) {
-	    	  createModelFromAjaxCall(data, graphModel);
+	    	  parseFlowData(data, graphModel);
 	          graphModel.trigger("change:graph");
 	          
 	          // Handle the hash changes here so the graph finishes rendering first.
diff --git a/src/web/js/azkaban.layout.js b/src/web/js/azkaban.layout.js
index 8ebe486..07debec 100644
--- a/src/web/js/azkaban.layout.js
+++ b/src/web/js/azkaban.layout.js
@@ -4,16 +4,59 @@ var degreeRatio = 1/8;
 var maxHeight = 200;
 var cornerGap = 10;
 
-function layoutGraph(nodes, edges, hmargin) {
-	var startLayer = [];
-	var numLayer = 0;
-	var nodeMap = {};
-	
+var idSort = function(a, b) {
+	if ( a.id < b.id ) {
+		return -1;
+	}
+	else if ( a.id > b.id ) {
+		return 1;
+	}
+	else {
+		return 0;
+	}
+}
+
+function prepareLayout(nodes, hmargin, layers, nodeMap) {
 	var maxLayer = 0;
-	var layers = {};
+	var numLayer = 0;
+	var nodeQueue = new Array();
+	// Find start layers first
+	for (var i=0; i < nodes.length; ++i) {
+		var node = nodes[i];
+		if (node.inNodes) {
+			// We sort here. Why? To keep the node drawing consistent
+			node.in.sort(idSort);
+		}
+		else {
+			// We sort here. Why? To keep it up and running.
+			nodeQueue.push(node);
+		}
+	}
+	// Sort here. To keep the node drawing consistent
+	nodes.sort(idSort);
 	
-	if (!hmargin) {
-		hmargin = 8;
+	// calculate level
+	// breath first search the sucker
+	var index = 0;
+	while(index < nodeQueue.length) {
+		var node = nodeQueue[index];
+		if (node.inNodes) {
+			var level = 0;
+			for (var key in node.inNodes) {
+				level = Math.max(level, node.inNodes[key].level);
+			}
+			node.level = level + 1;
+		}
+		else {
+			node.level = 0;
+		}
+		
+		if (node.outNodes) {
+			for (var key in node.outNodes) {
+				nodeQueue.push(node.outNodes[key]);
+			}
+		}
+		index++;
 	}
 	
 	// Assign to layers
@@ -33,13 +76,30 @@ function layoutGraph(nodes, edges, hmargin) {
 		layers[node.level].push(node);
 	}
 	
+	layers.numLayer = numLayer;
+	layers.maxLayer = maxLayer;
+}
+
+function layoutGraph(nodes, edges, hmargin) {
+	var startLayer = [];
+
+	var nodeMap = {};
+	var layers = {};
+	
+	if (!hmargin) {
+		hmargin = 8;
+	}
+	
+	prepareLayout(nodes, hmargin, layers, nodeMap);
+	var maxLayer = layers.maxLayer;
+	var numLayer = layers.numLayer;
+	
 	// Create dummy nodes
 	var edgeDummies = {};
-	
 	for (var i=0; i < edges.length; ++i ) {
 		var edge = edges[i];
 		var src = edges[i].from;
-		var dest = edges[i].target;
+		var dest = edges[i].to;
 		
 		var edgeId = src + ">>" + dest;
 		
@@ -96,7 +156,6 @@ function layoutGraph(nodes, edges, hmargin) {
 		spreadLayerSmart(layers[i]);
 	}
 	
-
 	// Space it vertically
 	spaceVertically(layers, maxLayer);
 	
@@ -107,12 +166,12 @@ function layoutGraph(nodes, edges, hmargin) {
 		node.x = layerNode.x;
 		node.y = layerNode.y;
 	}
-	
+
 	// Dummy node for more points.
 	for (var i = 0; i < edges.length; ++i) {
 		var edge = edges[i];
 		var src = edges[i].from;
-		var dest = edges[i].target;
+		var dest = edges[i].to;
 		
 		var edgeId = src + ">>" + dest;
 
diff --git a/src/web/js/azkaban.svg.exflow.helper.js b/src/web/js/azkaban.svg.exflow.helper.js
new file mode 100644
index 0000000..6604020
--- /dev/null
+++ b/src/web/js/azkaban.svg.exflow.helper.js
@@ -0,0 +1,200 @@
+var extendedViewPanels = {};
+var extendedDataModels = {};
+var openJobDisplayCallback = function(nodeId, flowId, evt) {
+	console.log("Open up data");
+	
+	var nodeInfoPanelID = flowId + ":" + nodeId + "-info";
+	if ($("#" + nodeInfoPanelID).length) {
+		$("#flowInfoBase").before(cloneStuff);
+		extendedViewPanels[nodeInfoPanelID].showExtendedView(evt);
+		return;
+	}
+	
+	var cloneStuff = $("#flowInfoBase").clone();
+	$(cloneStuff).attr("id", nodeInfoPanelID);
+	
+	
+	
+	/*
+	$("#flowInfoBase").before(cloneStuff);
+	var requestURL = contextURL + "/manager";
+	
+	$.get(
+      requestURL,
+      {"project": projectName, "ajax":"fetchflownodedata", "flow":flowId, "node": nodeId},
+      function(data) {
+  		var graphModel = new azkaban.GraphModel();
+  		graphModel.set({id: data.id, flow: data.flowData, type: data.type, props: data.props});
+
+  		var flowData = data.flowData;
+  		if (flowData) {
+  			createModelFromAjaxCall(flowData, graphModel);
+  		}
+  		
+  		var backboneView = new azkaban.FlowExtendedViewPanel({el:cloneStuff, model: graphModel});
+  		extendedViewPanels[nodeInfoPanelID] = backboneView;
+  		extendedDataModels[nodeInfoPanelID] = graphModel;
+  		backboneView.showExtendedView(evt);
+      },
+      "json"
+    );
+    */
+}
+
+var extendedDataModels = {};
+var createModelFromAjaxCall = function(data, model) {
+	  var nodes = {};
+  	  for (var i=0; i < data.nodes.length; ++i) {
+  		var graphModel = new azkaban.GraphModel();
+  	  	var node = data.nodes[i];
+  	  	nodes[node.id] = node;
+  	  }
+
+  	  for (var i=0; i < data.edges.length; ++i) {
+  	  	var edge = data.edges[i];
+  	  	var fromNode = nodes[edge.from];
+  	  	var toNode = nodes[edge.target];
+  	  	
+  	  	if (!fromNode.outNodes) {
+  	  		fromNode.outNodes = {};
+  	  	}
+  	  	fromNode.outNodes[toNode.id] = toNode;
+  	  	
+  	  	if (!toNode.inNodes) {
+  	  		toNode.inNodes = {};
+  	  	}
+  	  	toNode.inNodes[fromNode.id] = fromNode;
+  	  }
+  
+      var nodeQueue = new Array();
+      for (var key in nodes) {
+          if (!nodes[key].inNodes) {
+             nodeQueue.push(nodes[key]);
+          }
+      }
+  	  
+      // calculate level
+      // breath first search the sucker
+      var index = 0;
+      while(index < nodeQueue.length) {
+          var node = nodeQueue[index];
+          if (node.inNodes) {
+              var level = 0;
+        	  for (var key in node.inNodes) {
+        		  level = Math.max(level, node.inNodes[key].level);
+        	  }
+              node.level = level + 1;
+          }
+          else {
+              node.level = 0;
+          }
+          
+          if (node.outNodes) {
+             for (var key in node.outNodes) {
+                 nodeQueue.push(node.outNodes[key]);
+             }
+          }
+          index++;
+      }
+      
+      for (var key in nodes) {
+    	  var node = nodes[key];
+    	  
+    	  if (node.type == "flow") {
+    		  var graphModel = new azkaban.GraphModel();
+    		  createModelFromAjaxCall(node, graphModel);
+    		  extendedDataModels["test"] = graphModel;
+    	  }
+      }
+
+      console.log("data fetched");
+      model.set({data: data});
+      model.set({nodes: nodes});
+      model.set({disabled: {}});
+}
+
+var nodeClickCallback = function(event, model, type) {
+	console.log("Node clicked callback");
+	var jobId = event.currentTarget.jobid;
+	var flowId = model.get("flowId");
+	var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowId + "&job=" + jobId;
+
+	if (event.currentTarget.jobtype == "flow") {
+		var flowRequestURL = contextURL + "/manager?project=" + projectName + "&flow=" + event.currentTarget.flowId;
+		menu = [
+				{title: "View Flow...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+				{break: 1},
+				{title: "Open Flow...", callback: function() {window.location.href=flowRequestURL;}},
+				{title: "Open Flow in New Window...", callback: function() {window.open(flowRequestURL);}},
+				{break: 1},
+				{title: "Open Properties...", callback: function() {window.location.href=requestURL;}},
+				{title: "Open Properties in New Window...", callback: function() {window.open(requestURL);}},
+				{break: 1},
+				{title: "Center Flow", callback: function() {model.trigger("centerNode", jobId)}}
+		];
+	}
+	else {
+		menu = [
+				{title: "View Job...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+				{break: 1},
+				{title: "Open Job...", callback: function() {window.location.href=requestURL;}},
+				{title: "Open Job in New Window...", callback: function() {window.open(requestURL);}},
+				{break: 1},
+				{title: "Center Job", callback: function() {model.trigger("centerNode", jobId)}}
+		];
+	}
+	contextMenuView.show(event, menu);
+}
+
+var jobClickCallback = function(event, model) {
+	console.log("Node clicked callback");
+	var jobId = event.currentTarget.jobid;
+	var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowId + "&job=" + jobId;
+
+	var menu;
+	if (event.currentTarget.jobtype == "flow") {
+		var flowRequestURL = contextURL + "/manager?project=" + projectName + "&flow=" + event.currentTarget.flowId;
+		menu = [
+				{title: "View Flow...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+				{break: 1},
+				{title: "Open Flow...", callback: function() {window.location.href=flowRequestURL;}},
+				{title: "Open Flow in New Window...", callback: function() {window.open(flowRequestURL);}},
+				{break: 1},
+				{title: "Open Properties...", callback: function() {window.location.href=requestURL;}},
+				{title: "Open Properties in New Window...", callback: function() {window.open(requestURL);}},
+				{break: 1},
+				{title: "Center Flow", callback: function() {model.trigger("centerNode", jobId)}}
+		];
+	}
+	else {
+		menu = [
+				{title: "View Job...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+				{break: 1},
+				{title: "Open Job...", callback: function() {window.location.href=requestURL;}},
+				{title: "Open Job in New Window...", callback: function() {window.open(requestURL);}},
+				{break: 1},
+				{title: "Center Job", callback: function() {graphModel.trigger("centerNode", jobId)}}
+		];
+	}
+	contextMenuView.show(event, menu);
+}
+
+var edgeClickCallback = function(event, model) {
+	console.log("Edge clicked callback");
+}
+
+var graphClickCallback = function(event, model) {
+	console.log("Graph clicked callback");
+	var jobId = event.currentTarget.jobid;
+	var flowId = model.get("flowId");
+	var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowId;
+
+	var menu = [	
+		{title: "Open Flow...", callback: function() {window.location.href=requestURL;}},
+		{title: "Open Flow in New Window...", callback: function() {window.open(requestURL);}},
+		{break: 1},
+		{title: "Center Graph", callback: function() {model.trigger("resetPanZoom");}}
+	];
+	
+	contextMenuView.show(event, menu);
+}
diff --git a/src/web/js/azkaban.svg.flow.loader.js b/src/web/js/azkaban.svg.flow.loader.js
new file mode 100644
index 0000000..801877c
--- /dev/null
+++ b/src/web/js/azkaban.svg.flow.loader.js
@@ -0,0 +1,234 @@
+var extendedViewPanels = {};
+var extendedDataModels = {};
+var openJobDisplayCallback = function(nodeId, flowId, evt) {
+	console.log("Open up data");
+	var target = evt.currentTarget;
+	var node = target.nodeobj;
+	
+	// If target panel exists, than we display and skip.
+	var targetPanel = node.panel;
+	if (targetPanel) {
+		$("#flowInfoBase").before(targetPanel);
+		targetPanel.showExtendedView(evt);
+	}
+	else {
+		var targetModel = node.dataModel;
+		var flowId = flowId;
+		
+		if (!targetModel) {
+			var requestURL = contextURL + "/manager";
+			var newParentPath = node.parentPath ? node.parentPath + ":" + flowId : flowId;
+			node.parentPath = newParentPath;
+			
+			$.get(
+		      requestURL,
+		      {"project": projectName, "ajax":"fetchflownodedata", "flow":flowId, "node": node.id},
+		      function(data) {
+		  		var graphModel = new azkaban.GraphModel();
+		  		graphModel.set({id: data.id, flow: data.flowData, type: data.type, props: data.props});
+				
+		  		var flowData = data.flowData;
+		  		if (flowData) {
+		  			parseFlowData(flowData, graphModel, newParentPath);
+		  		}
+		  		
+		  		node.dataModel = graphModel;
+		  		createNewPanel(node, graphModel, evt);
+		      },
+		      "json"
+		    );
+		}
+		else {
+			createNewPanel(node, targetModel, evt);
+		}
+	}
+
+	/*
+	$("#flowInfoBase").before(cloneStuff);
+	var requestURL = contextURL + "/manager";
+	
+	$.get(
+      requestURL,
+      {"project": projectName, "ajax":"fetchflownodedata", "flow":flowId, "node": nodeId},
+      function(data) {
+  		var graphModel = new azkaban.GraphModel();
+  		graphModel.set({id: data.id, flow: data.flowData, type: data.type, props: data.props});
+
+  		var flowData = data.flowData;
+  		if (flowData) {
+  			createModelFromAjaxCall(flowData, graphModel);
+  		}
+  		
+  		var backboneView = new azkaban.FlowExtendedViewPanel({el:cloneStuff, model: graphModel});
+  		extendedViewPanels[nodeInfoPanelID] = backboneView;
+  		extendedDataModels[nodeInfoPanelID] = graphModel;
+  		backboneView.showExtendedView(evt);
+      },
+      "json"
+    );
+    */
+}
+
+var createNewPanel = function(node, model, evt) {
+	var parentPath = node.parentPath;
+	
+	var nodeInfoPanelID = parentPath ? parentPath + ":" + node.id + "-info" : node.id + "-info";
+	var cloneStuff = $("#flowInfoBase").clone();
+	cloneStuff.nodeobj = node;
+	$(cloneStuff).attr("id", nodeInfoPanelID);
+	$("#flowInfoBase").before(cloneStuff);
+	
+	var backboneView = new azkaban.FlowExtendedViewPanel({el:cloneStuff, model: model});
+	node.panel = backboneView;
+	backboneView.showExtendedView(evt);
+}
+
+var parseFlowData = function(data, model, parentPath) {
+	var nodes = {};
+	var edges = new Array();
+	for (var i=0; i < data.nodes.length; ++i) {
+		var node = data.nodes[i];
+		nodes[node.id] = node;
+	}
+
+    var nodeQueue = new Array();
+	for (var i=0; i < data.nodes.length; ++i) {
+		var node = data.nodes[i];
+		if (node.in) {
+			for (var j=0; j < node.in.length; ++j) {
+				var fromNode = nodes[node.in[j]];
+				if (!fromNode.outNodes) {
+					fromNode.outNodes = {};
+				}
+				if (!node.inNodes) {
+					node.inNodes = {};
+				}
+				
+				fromNode.outNodes[node.id] = node;
+				node.inNodes[fromNode.id] = fromNode;
+				edges.push({to: node.id, from: fromNode.id});
+			}
+		}
+		else {
+			// Queue used for breath first.
+			nodeQueue.push(node);
+		}
+	}
+
+	// Iterate over the nodes again
+	var embeddedFlows = {};
+	var newParentPath = parentPath ? parentPath + ":" + data.flow : data.flow;
+	
+	for (var key in nodes) {
+		var node = nodes[key];
+		node.parentPath = newParentPath;
+		if (node.type == "flow" && node.flowData) {
+			var graphModel = new azkaban.GraphModel();
+
+			node.flowData.id = node.id;
+			node.flowData.flowId = node.flowId;
+			parseFlowData(node.flowData, graphModel, newParentPath);
+			graphModel.set({id: node.id, flow: node.flowData, type: node.type, props: node.props});
+			graphModel.set({isEmbedded: true});
+			node.dataModel = graphModel;
+		}
+	}
+	
+	console.log("data fetched");
+	model.set({flow: data.flow});
+	model.set({data: data});
+	model.set({nodes: nodes});
+	model.set({edges: edges});
+	model.set({disabled: {}});
+}
+
+var closeAllSubDisplays = function() {
+	$(".flowExtendedView").hide();
+}
+
+var nodeClickCallback = function(event, model, type) {
+	console.log("Node clicked callback");
+	var target = event.currentTarget;
+	var jobId = target.jobid;
+	var flowId = model.get("flow");
+	var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowId + "&job=" + jobId;
+
+	if (event.currentTarget.jobtype == "flow") {
+		var flowRequestURL = contextURL + "/manager?project=" + projectName + "&flow=" + event.currentTarget.flowId;
+		menu = [
+				{title: "View Flow...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+				{break: 1},
+				{title: "Open Flow...", callback: function() {window.location.href=flowRequestURL;}},
+				{title: "Open Flow in New Window...", callback: function() {window.open(flowRequestURL);}},
+				{break: 1},
+				{title: "Open Properties...", callback: function() {window.location.href=requestURL;}},
+				{title: "Open Properties in New Window...", callback: function() {window.open(requestURL);}},
+				{break: 1},
+				{title: "Center Flow", callback: function() {model.trigger("centerNode", jobId)}}
+		];
+	}
+	else {
+		menu = [
+				{title: "View Job...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+				{break: 1},
+				{title: "Open Job...", callback: function() {window.location.href=requestURL;}},
+				{title: "Open Job in New Window...", callback: function() {window.open(requestURL);}},
+				{break: 1},
+				{title: "Center Job", callback: function() {model.trigger("centerNode", jobId)}}
+		];
+	}
+	contextMenuView.show(event, menu);
+}
+
+var jobClickCallback = function(event, model) {
+	console.log("Node clicked callback");
+	var jobId = event.currentTarget.jobid;
+	var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowId + "&job=" + jobId;
+
+	var menu;
+	if (event.currentTarget.jobtype == "flow") {
+		var flowRequestURL = contextURL + "/manager?project=" + projectName + "&flow=" + event.currentTarget.flowId;
+		menu = [
+				{title: "View Flow...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+				{break: 1},
+				{title: "Open Flow...", callback: function() {window.location.href=flowRequestURL;}},
+				{title: "Open Flow in New Window...", callback: function() {window.open(flowRequestURL);}},
+				{break: 1},
+				{title: "Open Properties...", callback: function() {window.location.href=requestURL;}},
+				{title: "Open Properties in New Window...", callback: function() {window.open(requestURL);}},
+				{break: 1},
+				{title: "Center Flow", callback: function() {model.trigger("centerNode", jobId)}}
+		];
+	}
+	else {
+		menu = [
+				{title: "View Job...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+				{break: 1},
+				{title: "Open Job...", callback: function() {window.location.href=requestURL;}},
+				{title: "Open Job in New Window...", callback: function() {window.open(requestURL);}},
+				{break: 1},
+				{title: "Center Job", callback: function() {graphModel.trigger("centerNode", jobId)}}
+		];
+	}
+	contextMenuView.show(event, menu);
+}
+
+var edgeClickCallback = function(event, model) {
+	console.log("Edge clicked callback");
+}
+
+var graphClickCallback = function(event, model) {
+	console.log("Graph clicked callback");
+	var jobId = event.currentTarget.jobid;
+	var flowId = model.get("flowId");
+	var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowId;
+
+	var menu = [	
+		{title: "Open Flow...", callback: function() {window.location.href=requestURL;}},
+		{title: "Open Flow in New Window...", callback: function() {window.open(requestURL);}},
+		{break: 1},
+		{title: "Center Graph", callback: function() {model.trigger("resetPanZoom");}}
+	];
+	
+	contextMenuView.show(event, menu);
+}
diff --git a/src/web/js/azkaban.svg.graph.view.js b/src/web/js/azkaban.svg.graph.view.js
index 1e0f443..1bd78de 100644
--- a/src/web/js/azkaban.svg.graph.view.js
+++ b/src/web/js/azkaban.svg.graph.view.js
@@ -94,14 +94,12 @@ azkaban.SvgGraphView = Backbone.View.extend({
 
 		var data = this.model.get("data");
 		var nodes = data.nodes;
-		var edges = data.edges;
+		var edges = this.model.get("edges");
+		
 		if (nodes.length == 0) {
 			console.log("No results");
 			return;
 		};
-	
-		nodes.sort();
-		edges.sort();
 		
 		var bounds = {};
 		this.nodes = {};
@@ -120,10 +118,10 @@ azkaban.SvgGraphView = Backbone.View.extend({
 		this.moveNodes(bounds);
 		
 		for (var i = 0; i < edges.length; ++i) {
-			var inNodes = this.nodes[edges[i].target].inNodes;
+			var inNodes = this.nodes[edges[i].to].inNodes;
 			if (!inNodes) {
 				inNodes = {};
-				this.nodes[edges[i].target].inNodes = inNodes;
+				this.nodes[edges[i].to].inNodes = inNodes;
 			}
 			inNodes[edges[i].from] = this.nodes[edges[i].from];
 			
@@ -132,12 +130,12 @@ azkaban.SvgGraphView = Backbone.View.extend({
 				outNodes = {};
 				this.nodes[edges[i].from].outNodes = outNodes;
 			}
-			outNodes[edges[i].target] = this.nodes[edges[i].target];
+			outNodes[edges[i].to] = this.nodes[edges[i].to];
 
 			this.drawEdge(this, edges[i]);
 		}
 		
-		this.model.set({"flowId":data.flowId, "nodes": this.nodes, "edges": edges});
+		this.model.set({"flowId":data.flowId, "edges": edges});
 		
 		var margin = this.graphMargin;
 		bounds.minX = bounds.minX ? bounds.minX - margin : -margin;
@@ -260,7 +258,7 @@ azkaban.SvgGraphView = Backbone.View.extend({
 		var svgns = self.svgns;
 		
 		var startNode = this.nodes[edge.from];
-		var endNode = this.nodes[edge.target];
+		var endNode = this.nodes[edge.to];
 		
 		var startPointY = startNode.y + startNode.height/2 - 3;
 		var endPointY = endNode.y - endNode.height/2 + 3;
@@ -276,7 +274,7 @@ azkaban.SvgGraphView = Backbone.View.extend({
 			var polyLine = document.createElementNS(svgns, "polyline");
 			polyLine.setAttribute("class", "edge");
 			polyLine.setAttribute("points", pointString);
-			polyLine.setAttribute("style", "fill:none;");
+			polyLine.setAttribute("style", "fill:none;stroke-width:3");
 			$(self.mainG).prepend(polyLine);
 		}
 		else { 
@@ -286,7 +284,7 @@ azkaban.SvgGraphView = Backbone.View.extend({
 			line.setAttribute("y1", startPointY);
 			line.setAttribute("x2", endNode.x);
 			line.setAttribute("y2", endPointY);
-			
+			line.setAttribute("style", "stroke-width:3");
 			$(self.mainG).prepend(line);
 		}
 	},
@@ -296,7 +294,7 @@ azkaban.SvgGraphView = Backbone.View.extend({
 		}
 		else {
 			this.drawBoxNode(self,node);
-			//this.drawCircleNode(self,node,bounds);
+			//this.drawCircleNode2(self,node);
 		}
 	},
 	moveNodes: function(bounds) {
@@ -362,7 +360,8 @@ azkaban.SvgGraphView = Backbone.View.extend({
 		innerG.jobid = node.id;
 		innerG.jobtype = "flow";
 		innerG.flowId = node.flowId;
-
+		innerG.nodeobj = node;
+		
 		nodeG.appendChild(innerG);
 		self.mainG.appendChild(nodeG);
 
@@ -400,6 +399,7 @@ azkaban.SvgGraphView = Backbone.View.extend({
 		
 		nodeG.setAttribute("class", "node");
 		nodeG.jobid=node.id;
+		nodeG.jobobj=node;
 	},
 	drawBoxNode: function(self, node) {
 		var svg = self.svgGraph;
@@ -432,7 +432,8 @@ azkaban.SvgGraphView = Backbone.View.extend({
 		innerG.appendChild(rect);
 		innerG.appendChild(text);
 		innerG.jobid = node.id;
-
+		innerG.nodeobj = node;
+		
 		nodeG.appendChild(innerG);
 		self.mainG.appendChild(nodeG);
 
@@ -462,6 +463,7 @@ azkaban.SvgGraphView = Backbone.View.extend({
 		
 		nodeG.setAttribute("class", "node");
 		nodeG.jobid=node.id;
+		nodeG.j=node;
 	},
 	drawCircleNode: function(self, node, bounds) {
 		var svg = self.svgGraph;
@@ -470,6 +472,7 @@ azkaban.SvgGraphView = Backbone.View.extend({
 		var xOffset = 10;
 		var yOffset = 10;
 		
+		var height = 18;
 		var nodeG = document.createElementNS(svgns, "g");
 		nodeG.setAttribute("class", "jobnode");
 		nodeG.setAttribute("font-family", "helvetica");
@@ -484,6 +487,7 @@ azkaban.SvgGraphView = Backbone.View.extend({
 		circle.setAttribute("cx", 10);
 		circle.setAttribute("r", 12);
 		circle.setAttribute("style", "width:inherit;stroke-opacity:1");
+		
 		//circle.setAttribute("class", "border");
 		//circle.setAttribute("class", "nodecontainer");
 		
@@ -492,8 +496,8 @@ azkaban.SvgGraphView = Backbone.View.extend({
 		text.appendChild(textLabel);
 		text.setAttribute("x", 0);
 		text.setAttribute("y", 0);
-				
-		this.addBounds(bounds, {minX:node.x - xOffset, minY: node.y - yOffset, maxX: node.x + xOffset, maxY: node.y + yOffset});
+
+		//this.addBounds(bounds, {minX:node.x - xOffset, minY: node.y - yOffset, maxX: node.x + xOffset, maxY: node.y + yOffset});
 		
 		var backRect = document.createElementNS(svgns, 'rect');
 		backRect.setAttribute("x", 0);
@@ -510,6 +514,9 @@ azkaban.SvgGraphView = Backbone.View.extend({
 		nodeG.appendChild(innerG);
 		self.mainG.appendChild(nodeG);
 
+		
+		var horizontalMargin = 8;
+		var verticalMargin = 2;
 		// Need to get text width after attaching to SVG.
 		var computeText = text.getComputedTextLength();
 		var halfWidth = computeText/2;
@@ -517,6 +524,88 @@ azkaban.SvgGraphView = Backbone.View.extend({
 		backRect.setAttribute("x", -halfWidth);
 		backRect.setAttribute("width", computeText + 20);
 
+		// Margin for surrounding box.
+		var boxWidth = computeText + horizontalMargin * 2;
+		var boxHeight = height + verticalMargin * 2;
+		
+		node.width = boxWidth;
+		node.height = boxHeight;
+		node.centerX = boxWidth/2;
+		node.centerY = boxHeight/2;
+		
+		nodeG.setAttribute("class", "node");
+		nodeG.nodeobj=node;
+		nodeG.jobid=node.id;
+	},
+	drawCircleNode2: function(self, node, bounds) {
+		var svg = self.svgGraph;
+		var svgns = self.svgns;
+
+		var xOffset = 10;
+		var yOffset = 10;
+		
+		var height = 18;
+		var nodeG = document.createElementNS(svgns, "g");
+		nodeG.setAttribute("class", "jobnode");
+		nodeG.setAttribute("font-family", "helvetica");
+		nodeG.setAttribute("transform", "translate(" + node.x + "," + node.y + ")");
+		this.gNodes[node.id] = nodeG;
+		
+		var innerG = document.createElementNS(svgns, "g");
+		innerG.setAttribute("transform", "translate(-10,-10)");
+		
+		var circle = document.createElementNS(svgns, 'circle');
+		circle.setAttribute("cy", 10);
+		circle.setAttribute("cx", 10);
+		circle.setAttribute("r", 12);
+		circle.setAttribute("style", "width:inherit;stroke-opacity:1;stroke:rgb(100,100,100);stroke-width:5");
+		//circle.setAttribute("class", "border");
+		//circle.setAttribute("class", "nodecontainer");
+		
+		var text = document.createElementNS(svgns, 'text');
+		var textLabel = document.createTextNode(node.label);
+		text.appendChild(textLabel);
+		text.setAttribute("x", 0);
+		text.setAttribute("y", 0);
+
+		//this.addBounds(bounds, {minX:node.x - xOffset, minY: node.y - yOffset, maxX: node.x + xOffset, maxY: node.y + yOffset});
+		
+		var backRect = document.createElementNS(svgns, 'rect');
+		backRect.setAttribute("x", 0);
+		backRect.setAttribute("y", 2);
+		backRect.setAttribute("class", "backboard");
+		backRect.setAttribute("width", 10);
+		backRect.setAttribute("height", 15);
+		
+		innerG.appendChild(circle);
+		innerG.appendChild(backRect);
+//		innerG.appendChild(text);
+		innerG.jobid = node.id;
+
+		nodeG.appendChild(innerG);
+		self.mainG.appendChild(nodeG);
+
+		
+		var horizontalMargin = 8;
+		var verticalMargin = 2;
+		// Need to get text width after attaching to SVG.
+		var computeText = 150;
+		var halfWidth = computeText/2;
+		text.setAttribute("x", -halfWidth + 10);
+		backRect.setAttribute("x", -halfWidth);
+		backRect.setAttribute("width", computeText + 20);
+
+		// Margin for surrounding box.
+		var boxWidth = computeText + horizontalMargin * 2;
+		var boxHeight = height + verticalMargin * 2;
+		
+//		innerG.removeChild(text);
+		
+		node.width = boxWidth;
+		node.height = boxHeight;
+		node.centerX = boxWidth/2;
+		node.centerY = boxHeight/2;
+		
 		nodeG.setAttribute("class", "node");
 		nodeG.jobid=node.id;
 	},

unit/build.xml 13(+12 -1)

diff --git a/unit/build.xml b/unit/build.xml
index 64d1314..c9f3f20 100644
--- a/unit/build.xml
+++ b/unit/build.xml
@@ -97,5 +97,16 @@
 			<zipfileset dir="${base.dir}/unit/executions/embedded" />
 		</zip>
 	</target>
-	
+
+	<target name="package-embedded3" depends="jars" description="Creates a test zip">
+		<delete dir="${dist.packages.dir}" />
+		<mkdir dir="${dist.packages.dir}" />
+
+                <!-- Tarball it -->
+                <zip destfile="${dist.packages.dir}/embedded.zip">
+                	<zipfileset dir="${dist.jar.dir}" />
+                	<zipfileset dir="${base.dir}/unit/executions/embedded3" />
+                </zip>
+
+	</target>	
 </project>
diff --git a/unit/executions/embedded3/innerFlow.job b/unit/executions/embedded3/innerFlow.job
new file mode 100644
index 0000000..e9b3b89
--- /dev/null
+++ b/unit/executions/embedded3/innerFlow.job
@@ -0,0 +1,4 @@
+type=javaprocess
+seconds=1
+fail=false
+dependencies=innerJobB,innerJobC
\ No newline at end of file
diff --git a/unit/executions/embedded3/innerFlow2.job b/unit/executions/embedded3/innerFlow2.job
new file mode 100644
index 0000000..2346982
--- /dev/null
+++ b/unit/executions/embedded3/innerFlow2.job
@@ -0,0 +1,4 @@
+type=javaprocess
+seconds=1
+fail=false
+dependencies=innerJobA
\ No newline at end of file
diff --git a/unit/executions/embedded3/innerJobA.job b/unit/executions/embedded3/innerJobA.job
new file mode 100644
index 0000000..665b38d
--- /dev/null
+++ b/unit/executions/embedded3/innerJobA.job
@@ -0,0 +1,4 @@
+type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
diff --git a/unit/executions/embedded3/innerJobB.job b/unit/executions/embedded3/innerJobB.job
new file mode 100644
index 0000000..24a2e04
--- /dev/null
+++ b/unit/executions/embedded3/innerJobB.job
@@ -0,0 +1,3 @@
+type=flow
+flow.name=innerFlow2
+dependencies=innerJobA
diff --git a/unit/executions/embedded3/innerJobC.job b/unit/executions/embedded3/innerJobC.job
new file mode 100644
index 0000000..178bbef
--- /dev/null
+++ b/unit/executions/embedded3/innerJobC.job
@@ -0,0 +1,5 @@
+type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
+dependencies=innerJobA
diff --git a/unit/executions/embedded3/joba.job b/unit/executions/embedded3/joba.job
new file mode 100644
index 0000000..665b38d
--- /dev/null
+++ b/unit/executions/embedded3/joba.job
@@ -0,0 +1,4 @@
+type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
diff --git a/unit/executions/embedded3/jobb.job b/unit/executions/embedded3/jobb.job
new file mode 100644
index 0000000..8844d8c
--- /dev/null
+++ b/unit/executions/embedded3/jobb.job
@@ -0,0 +1,3 @@
+type=flow
+flow.name=innerFlow
+dependencies=joba
diff --git a/unit/executions/embedded3/jobc.job b/unit/executions/embedded3/jobc.job
new file mode 100644
index 0000000..8844d8c
--- /dev/null
+++ b/unit/executions/embedded3/jobc.job
@@ -0,0 +1,3 @@
+type=flow
+flow.name=innerFlow
+dependencies=joba
diff --git a/unit/executions/embedded3/jobd.job b/unit/executions/embedded3/jobd.job
new file mode 100644
index 0000000..8844d8c
--- /dev/null
+++ b/unit/executions/embedded3/jobd.job
@@ -0,0 +1,3 @@
+type=flow
+flow.name=innerFlow
+dependencies=joba
diff --git a/unit/executions/embedded3/jobe.job b/unit/executions/embedded3/jobe.job
new file mode 100644
index 0000000..fe986d5
--- /dev/null
+++ b/unit/executions/embedded3/jobe.job
@@ -0,0 +1,5 @@
+type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
+dependencies=jobb,jobc,jobd
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
index 9d755df..9df97ac 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
@@ -582,16 +582,16 @@ public class FlowRunnerPipelineTest {
 		FileUtils.copyDirectory(directory, workingDir);
 	}
 	
-	private void printCurrentState(String prefix, ExecutableFlowBase flow) {
-		for(ExecutableNode node: flow.getExecutableNodes()) {
-
-			System.err.println(prefix + node.getNestedId() + "->" + node.getStatus().name());
-			if (node instanceof ExecutableFlowBase) {
-				printCurrentState(prefix, (ExecutableFlowBase)node);
-			}
-		}
-	}
-	
+//	private void printCurrentState(String prefix, ExecutableFlowBase flow) {
+//		for(ExecutableNode node: flow.getExecutableNodes()) {
+//
+//			System.err.println(prefix + node.getNestedId() + "->" + node.getStatus().name());
+//			if (node instanceof ExecutableFlowBase) {
+//				printCurrentState(prefix, (ExecutableFlowBase)node);
+//			}
+//		}
+//	}
+//	
 	private FlowRunner createFlowRunner(EventCollectorListener eventCollector, String flowName, String groupName) throws Exception {
 		return createFlowRunner(eventCollector, flowName, groupName, new ExecutionOptions());
 	}
diff --git a/unit/java/azkaban/test/executor/ExecutableFlowTest.java b/unit/java/azkaban/test/executor/ExecutableFlowTest.java
index 35fa39d..9031518 100644
--- a/unit/java/azkaban/test/executor/ExecutableFlowTest.java
+++ b/unit/java/azkaban/test/executor/ExecutableFlowTest.java
@@ -277,8 +277,6 @@ public class ExecutableFlowTest {
 		Assert.assertEquals(a.getOutNodes(), a.getOutNodes());
 	}
 	
-	
-	
 	public static void testEquals(ExecutionOptions optionsA, ExecutionOptions optionsB) {
 		Assert.assertEquals(optionsA.getConcurrentOption(), optionsB.getConcurrentOption());
 		Assert.assertEquals(optionsA.getNotifyOnFirstFailure(), optionsB.getNotifyOnFirstFailure());