azkaban-memoizeit
Changes
src/java/azkaban/executor/ExecutableFlow.java 36(+13 -23)
src/java/azkaban/executor/ExecutableNode.java 85(+41 -44)
src/java/azkaban/executor/ExecutionOptions.java 52(+20 -32)
src/java/azkaban/utils/TypedMapWrapper.java 141(+141 -0)
src/java/azkaban/webapp/servlet/ExecutorServlet.java 171(+130 -41)
src/web/js/azkaban.exflow.view.js 89(+6 -83)
src/web/js/azkaban.flow.view.js 3(+2 -1)
src/web/js/azkaban.layout.js 85(+72 -13)
src/web/js/azkaban.svg.exflow.helper.js 200(+200 -0)
src/web/js/azkaban.svg.flow.helper.js 97(+55 -42)
src/web/js/azkaban.svg.flow.loader.js 234(+234 -0)
src/web/js/azkaban.svg.graph.view.js 121(+105 -16)
unit/build.xml 13(+12 -1)
unit/executions/embedded3/joba.job 4(+4 -0)
unit/executions/embedded3/jobb.job 3(+3 -0)
unit/executions/embedded3/jobc.job 3(+3 -0)
unit/executions/embedded3/jobd.job 3(+3 -0)
unit/executions/embedded3/jobe.job 5(+5 -0)
Details
diff --git a/src/java/azkaban/executor/ConnectorParams.java b/src/java/azkaban/executor/ConnectorParams.java
index 15a8bf8..3fff9c2 100644
--- a/src/java/azkaban/executor/ConnectorParams.java
+++ b/src/java/azkaban/executor/ConnectorParams.java
@@ -62,11 +62,11 @@ public interface ConnectorParams {
public static final int NODE_END_INDEX = 3;
public static final String UPDATE_TIME_LIST_PARAM = "updatetime";
- public static final String EXEC_ID_LIST_PARAM = "execid";
+ public static final String EXEC_ID_LIST_PARAM = "executionId";
public static final String FORCED_FAILED_MARKER = ".failed";
- public static final String UPDATE_MAP_EXEC_ID = "execId";
+ public static final String UPDATE_MAP_EXEC_ID = "executionId";
public static final String UPDATE_MAP_JOBID = "jobId";
public static final String UPDATE_MAP_UPDATE_TIME = "updateTime";
public static final String UPDATE_MAP_STATUS = "status";
src/java/azkaban/executor/ExecutableFlow.java 36(+13 -23)
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 9ee6aa2..81ae8aa 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -19,13 +19,13 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import azkaban.flow.Flow;
-import azkaban.flow.FlowProps;
import azkaban.project.Project;
-import azkaban.utils.JSONUtils;
+import azkaban.utils.TypedMapWrapper;
public class ExecutableFlow extends ExecutableFlowBase {
public static final String EXECUTIONID_PARAM = "executionId";
@@ -192,30 +192,20 @@ public class ExecutableFlow extends ExecutableFlowBase {
}
@Override
- @SuppressWarnings("unchecked")
- public void fillExecutableFromMapObject(Map<String, Object> flowObj) {
+ public void fillExecutableFromMapObject(TypedMapWrapper<String, Object> flowObj) {
super.fillExecutableFromMapObject(flowObj);
- this.executionId = (Integer)flowObj.get(EXECUTIONID_PARAM);
- this.executionPath = (String)flowObj.get(EXECUTIONPATH_PARAM);
-
- this.projectId = (Integer)flowObj.get(PROJECTID_PARAM);
- if (flowObj.containsKey(SCHEDULEID_PARAM)) {
- this.scheduleId = (Integer)flowObj.get(SCHEDULEID_PARAM);
- }
-
- if (flowObj.containsKey(SUBMITUSER_PARAM)) {
- this.submitUser = (String)flowObj.get(SUBMITUSER_PARAM);
- }
- else {
- this.submitUser = null;
- }
- this.version = (Integer)flowObj.get(VERSION_PARAM);
-
- this.submitTime = JSONUtils.getLongFromObject(flowObj.get(SUBMITTIME_PARAM));
+ this.executionId = flowObj.getInt(EXECUTIONID_PARAM);
+ this.executionPath = flowObj.getString(EXECUTIONPATH_PARAM);
+
+ this.projectId = flowObj.getInt(PROJECTID_PARAM);
+ this.scheduleId = flowObj.getInt(SCHEDULEID_PARAM);
+ this.submitUser = flowObj.getString(SUBMITUSER_PARAM);
+ this.version = flowObj.getInt(VERSION_PARAM);
+ this.submitTime = flowObj.getLong(SUBMITTIME_PARAM);
if (flowObj.containsKey(EXECUTIONOPTIONS_PARAM)) {
- this.executionOptions = ExecutionOptions.createFromObject(flowObj.get(EXECUTIONOPTIONS_PARAM));
+ this.executionOptions = ExecutionOptions.createFromObject(flowObj.getObject(EXECUTIONOPTIONS_PARAM));
}
else {
// for backwards compatibility should remove in a few versions.
@@ -223,7 +213,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
}
if(flowObj.containsKey(PROXYUSERS_PARAM)) {
- ArrayList<String> proxyUserList = (ArrayList<String>) flowObj.get(PROXYUSERS_PARAM);
+ List<String> proxyUserList = flowObj.<String>getList(PROXYUSERS_PARAM);
this.addAllProxyUsers(proxyUserList);
}
}
diff --git a/src/java/azkaban/executor/ExecutableFlowBase.java b/src/java/azkaban/executor/ExecutableFlowBase.java
index 798f40a..040888b 100644
--- a/src/java/azkaban/executor/ExecutableFlowBase.java
+++ b/src/java/azkaban/executor/ExecutableFlowBase.java
@@ -27,6 +27,7 @@ import azkaban.flow.FlowProps;
import azkaban.flow.Node;
import azkaban.flow.SpecialJobTypes;
import azkaban.project.Project;
+import azkaban.utils.TypedMapWrapper;
public class ExecutableFlowBase extends ExecutableNode {
public static final String FLOW_ID_PARAM = "flowId";
@@ -206,24 +207,23 @@ public class ExecutableFlowBase extends ExecutableNode {
flowObjMap.put(PROPERTIES_PARAM, props);
}
- /**
- * Using the parameters in the map created from a json file, fill the results of this node
- */
- @SuppressWarnings("unchecked")
- public void fillExecutableFromMapObject(Map<String,Object> flowObjMap) {
+ @Override
+ public void fillExecutableFromMapObject(TypedMapWrapper<String,Object> flowObjMap) {
super.fillExecutableFromMapObject(flowObjMap);
- this.flowId = (String)flowObjMap.get(FLOW_ID_PARAM);
+ this.flowId = flowObjMap.getString(FLOW_ID_PARAM);
+ List<Object> nodes = flowObjMap.<Object>getList(NODES_PARAM);
- List<Object> nodes = (List<Object>)flowObjMap.get(NODES_PARAM);
if (nodes != null) {
for (Object nodeObj: nodes) {
+ @SuppressWarnings("unchecked")
Map<String,Object> nodeObjMap = (Map<String,Object>)nodeObj;
+ TypedMapWrapper<String,Object> wrapper = new TypedMapWrapper<String,Object>(nodeObjMap);
- String type = (String)nodeObjMap.get(TYPE_PARAM);
- if (type.equals(SpecialJobTypes.EMBEDDED_FLOW_TYPE)) {
+ String type = wrapper.getString(TYPE_PARAM);
+ if (type != null && type.equals(SpecialJobTypes.EMBEDDED_FLOW_TYPE)) {
ExecutableFlowBase exFlow = new ExecutableFlowBase();
- exFlow.fillExecutableFromMapObject(nodeObjMap);
+ exFlow.fillExecutableFromMapObject(wrapper);
exFlow.setParentFlow(this);
executableNodes.put(exFlow.getId(), exFlow);
@@ -238,8 +238,9 @@ public class ExecutableFlowBase extends ExecutableNode {
}
}
- List<Object> properties = (List<Object>)flowObjMap.get(PROPERTIES_PARAM);
+ List<Object> properties = flowObjMap.<Object>getList(PROPERTIES_PARAM);
for (Object propNode : properties) {
+ @SuppressWarnings("unchecked")
HashMap<String, Object> fprop = (HashMap<String, Object>)propNode;
String source = (String)fprop.get("source");
String inheritedSource = (String)fprop.get("inherited");
@@ -278,22 +279,21 @@ public class ExecutableFlowBase extends ExecutableNode {
return updateData;
}
- public void applyUpdateObject(Map<String, Object> updateData, List<ExecutableNode> updatedNodes) {
+ public void applyUpdateObject(TypedMapWrapper<String, Object> updateData, List<ExecutableNode> updatedNodes) {
super.applyUpdateObject(updateData);
if (updatedNodes != null) {
updatedNodes.add(this);
}
-
- @SuppressWarnings("unchecked")
- List<Map<String,Object>> nodes = (List<Map<String,Object>>)updateData.get(NODES_PARAM);
+
+ List<Map<String,Object>> nodes = (List<Map<String,Object>>)updateData.<Map<String,Object>>getList(NODES_PARAM);
if (nodes != null) {
for (Map<String,Object> node: nodes) {
-
- String id = (String)node.get(ID_PARAM);
+ TypedMapWrapper<String,Object> nodeWrapper = new TypedMapWrapper<String,Object>(node);
+ String id = nodeWrapper.getString(ID_PARAM);
if (id == null) {
// Legacy case
- id = (String)node.get("jobId");
+ id = nodeWrapper.getString("jobId");
}
ExecutableNode exNode = executableNodes.get(id);
@@ -302,18 +302,24 @@ public class ExecutableFlowBase extends ExecutableNode {
}
if (exNode instanceof ExecutableFlowBase) {
- ((ExecutableFlowBase)exNode).applyUpdateObject(node, updatedNodes);
+ ((ExecutableFlowBase)exNode).applyUpdateObject(nodeWrapper, updatedNodes);
}
else {
- exNode.applyUpdateObject(node);
+ exNode.applyUpdateObject(nodeWrapper);
}
}
}
-
}
+ public void applyUpdateObject(Map<String, Object> updateData, List<ExecutableNode> updatedNodes) {
+ TypedMapWrapper<String, Object> typedMapWrapper = new TypedMapWrapper<String,Object>(updateData);
+ applyUpdateObject(typedMapWrapper, updatedNodes);
+ }
+
+ @Override
public void applyUpdateObject(Map<String, Object> updateData) {
- applyUpdateObject(updateData, null);
+ TypedMapWrapper<String, Object> typedMapWrapper = new TypedMapWrapper<String,Object>(updateData);
+ applyUpdateObject(typedMapWrapper, null);
}
public void reEnableDependents(ExecutableNode ... nodes) {
src/java/azkaban/executor/ExecutableNode.java 85(+41 -44)
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 79942e9..0683c38 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -2,15 +2,16 @@ package azkaban.executor;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import azkaban.flow.Node;
-import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
+import azkaban.utils.TypedMapWrapper;
/**
* Base Executable that nodes and flows are based.
@@ -291,38 +292,30 @@ public class ExecutableNode {
}
@SuppressWarnings("unchecked")
- public void fillExecutableFromMapObject(Map<String,Object> objMap) {
- this.id = (String)objMap.get(ID_PARAM);
- this.status = Status.valueOf((String)objMap.get(STATUS_PARAM));
- this.startTime = JSONUtils.getLongFromObject(objMap.get(STARTTIME_PARAM));
- this.endTime = JSONUtils.getLongFromObject(objMap.get(ENDTIME_PARAM));
- this.updateTime = JSONUtils.getLongFromObject(objMap.get(UPDATETIME_PARAM));
- this.type = (String)objMap.get(TYPE_PARAM);
- this.attempt = (Integer)objMap.get(ATTEMPT_PARAM);
+ public void fillExecutableFromMapObject(TypedMapWrapper<String, Object> wrappedMap) {
+ this.id = wrappedMap.getString(ID_PARAM);
+ this.type = wrappedMap.getString(TYPE_PARAM);
+ this.status = Status.valueOf(wrappedMap.getString(STATUS_PARAM));
+ this.startTime = wrappedMap.getLong(STARTTIME_PARAM);
+ this.endTime = wrappedMap.getLong(ENDTIME_PARAM);
+ this.updateTime = wrappedMap.getLong(UPDATETIME_PARAM);
+ this.attempt = wrappedMap.getInt(ATTEMPT_PARAM, 0);
+
+ this.inNodes = new HashSet<String>();
+ this.inNodes.addAll(wrappedMap.getStringCollection(INNODES_PARAM, Collections.<String>emptySet()));
- if (objMap.containsKey(INNODES_PARAM)) {
- this.inNodes = new HashSet<String>();
- this.inNodes.addAll((Collection<String>)objMap.get(INNODES_PARAM));
- }
+ this.outNodes = new HashSet<String>();
+ this.outNodes.addAll(wrappedMap.getStringCollection(OUTNODES_PARAM, Collections.<String>emptySet()));
- if (objMap.containsKey(OUTNODES_PARAM)) {
- this.outNodes = new HashSet<String>();
- this.outNodes.addAll((Collection<String>)objMap.get(OUTNODES_PARAM));
- }
-
- if (objMap.containsKey(PROPS_SOURCE_PARAM)) {
- this.propsSource = (String)objMap.get(PROPS_SOURCE_PARAM);
- }
+ this.propsSource = wrappedMap.getString(PROPS_SOURCE_PARAM);
+ this.jobSource = wrappedMap.getString(JOB_SOURCE_PARAM);
- if (objMap.containsKey(JOB_SOURCE_PARAM)) {
- this.jobSource = (String)objMap.get(JOB_SOURCE_PARAM);
+ Map<String, String> outputProps = wrappedMap.<String,String>getMap(OUTPUT_PROPS_PARAM);
+ if (outputProps != null) {
+ this.outputProps = new Props(null, outputProps);
}
- if (objMap.containsKey(OUTPUT_PROPS_PARAM)) {
- this.outputProps = new Props(null, (Map<String,String>)objMap.get(OUTPUT_PROPS_PARAM));
- }
-
- Collection<Object> pastAttempts = (Collection<Object>)objMap.get(PASTATTEMPTS_PARAM);
+ Collection<Object> pastAttempts = wrappedMap.<Object>getCollection(PASTATTEMPTS_PARAM);
if (pastAttempts!=null) {
ArrayList<ExecutionAttempt> attempts = new ArrayList<ExecutionAttempt>();
for (Object attemptObj: pastAttempts) {
@@ -334,6 +327,11 @@ public class ExecutableNode {
}
}
+ public void fillExecutableFromMapObject(Map<String,Object> objMap) {
+ TypedMapWrapper<String, Object> wrapper = new TypedMapWrapper<String, Object>(objMap);
+ fillExecutableFromMapObject(wrapper);
+ }
+
public Map<String, Object> toUpdateObject() {
Map<String, Object> updatedNodeMap = new HashMap<String,Object>();
updatedNodeMap.put(ID_PARAM, getId());
@@ -355,28 +353,25 @@ public class ExecutableNode {
return updatedNodeMap;
}
- @SuppressWarnings("unchecked")
- public void applyUpdateObject(Map<String, Object> updateData) {
- if (updateData.containsKey(STATUS_PARAM)) {
- this.status = Status.fromInteger((Integer)updateData.get(STATUS_PARAM));
- }
- if (updateData.containsKey(STARTTIME_PARAM)) {
- this.startTime = JSONUtils.getLongFromObject(updateData.get(STARTTIME_PARAM));
- }
- if (updateData.containsKey(UPDATETIME_PARAM)) {
- this.updateTime = JSONUtils.getLongFromObject(updateData.get(UPDATETIME_PARAM));
- }
- if (updateData.containsKey(ENDTIME_PARAM)) {
- this.endTime = JSONUtils.getLongFromObject(updateData.get(ENDTIME_PARAM));
- }
+ public void applyUpdateObject(TypedMapWrapper<String, Object> updateData) {
+ this.status = Status.fromInteger(updateData.getInt(STATUS_PARAM, this.status.getNumVal()));
+ this.startTime = updateData.getLong(STARTTIME_PARAM);
+ this.updateTime = updateData.getLong(UPDATETIME_PARAM);
+ this.endTime = updateData.getLong(ENDTIME_PARAM);
if (updateData.containsKey(ATTEMPT_PARAM)) {
- attempt = (Integer)updateData.get(ATTEMPT_PARAM);
+ attempt = updateData.getInt(ATTEMPT_PARAM);
if (attempt > 0) {
- updatePastAttempts((List<Object>)updateData.get(PASTATTEMPTS_PARAM));
+ updatePastAttempts(
+ updateData.<Object>getList(PASTATTEMPTS_PARAM, Collections.<Object>emptyList()));
}
}
}
+
+ public void applyUpdateObject(Map<String, Object> updateData) {
+ TypedMapWrapper<String,Object> wrapper = new TypedMapWrapper<String,Object>(updateData);
+ applyUpdateObject(wrapper);
+ }
public void killNode(long killTime) {
if (this.status == Status.DISABLED) {
@@ -417,4 +412,6 @@ public class ExecutableNode {
}
}
}
+
+
}
diff --git a/src/java/azkaban/executor/ExecutionAttempt.java b/src/java/azkaban/executor/ExecutionAttempt.java
index 7da0623..7712010 100644
--- a/src/java/azkaban/executor/ExecutionAttempt.java
+++ b/src/java/azkaban/executor/ExecutionAttempt.java
@@ -3,7 +3,7 @@ package azkaban.executor;
import java.util.HashMap;
import java.util.Map;
-import azkaban.utils.JSONUtils;
+import azkaban.utils.TypedMapWrapper;
public class ExecutionAttempt {
public static final String ATTEMPT_PARAM = "attempt";
@@ -49,10 +49,11 @@ public class ExecutionAttempt {
public static ExecutionAttempt fromObject(Object obj) {
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>)obj;
- int attempt = (Integer)map.get(ATTEMPT_PARAM);
- long startTime = JSONUtils.getLongFromObject(map.get(STARTTIME_PARAM));
- long endTime = JSONUtils.getLongFromObject(map.get(ENDTIME_PARAM));
- Status status = Status.valueOf((String)map.get(STATUS_PARAM));
+ TypedMapWrapper<String, Object> wrapper = new TypedMapWrapper<String, Object>(map);
+ int attempt = wrapper.getInt(ATTEMPT_PARAM);
+ long startTime = wrapper.getLong(STARTTIME_PARAM);
+ long endTime = wrapper.getLong(ENDTIME_PARAM);
+ Status status = Status.valueOf(wrapper.getString(STATUS_PARAM));
return new ExecutionAttempt(attempt, startTime, endTime, status);
}
src/java/azkaban/executor/ExecutionOptions.java 52(+20 -32)
diff --git a/src/java/azkaban/executor/ExecutionOptions.java b/src/java/azkaban/executor/ExecutionOptions.java
index cedc3f0..0148ab3 100644
--- a/src/java/azkaban/executor/ExecutionOptions.java
+++ b/src/java/azkaban/executor/ExecutionOptions.java
@@ -2,12 +2,15 @@ package azkaban.executor;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import azkaban.utils.TypedMapWrapper;
+
/**
* Execution options for submitted flows and scheduled flows
*/
@@ -179,50 +182,35 @@ public class ExecutionOptions {
}
Map<String,Object> optionsMap = (Map<String,Object>)obj;
+ TypedMapWrapper<String,Object> wrapper = new TypedMapWrapper<String,Object>(optionsMap);
ExecutionOptions options = new ExecutionOptions();
if (optionsMap.containsKey(FLOW_PARAMETERS)) {
options.flowParameters = new HashMap<String, String>();
- options.flowParameters.putAll((Map<String,String>)optionsMap.get(FLOW_PARAMETERS));
+ options.flowParameters.putAll(wrapper.<String,String>getMap(FLOW_PARAMETERS));
}
// Failure notification
- if (optionsMap.containsKey(NOTIFY_ON_FIRST_FAILURE)) {
- options.notifyOnFirstFailure = (Boolean)optionsMap.get(NOTIFY_ON_FIRST_FAILURE);
- }
- if (optionsMap.containsKey(NOTIFY_ON_LAST_FAILURE)) {
- options.notifyOnLastFailure = (Boolean)optionsMap.get(NOTIFY_ON_LAST_FAILURE);
- }
- if (optionsMap.containsKey(CONCURRENT_OPTION)) {
- options.concurrentOption = (String)optionsMap.get(CONCURRENT_OPTION);
- }
- if (optionsMap.containsKey(DISABLE)) {
- options.initiallyDisabledJobs = new HashSet<String>((Collection<String>)optionsMap.get(DISABLE));
+ options.notifyOnFirstFailure = wrapper.getBool(NOTIFY_ON_FIRST_FAILURE, options.notifyOnFirstFailure);
+ options.notifyOnLastFailure = wrapper.getBool(NOTIFY_ON_LAST_FAILURE, options.notifyOnLastFailure);
+ options.concurrentOption = wrapper.getString(CONCURRENT_OPTION, options.concurrentOption);
+
+ if (wrapper.containsKey(DISABLE)) {
+ options.initiallyDisabledJobs = new HashSet<String>(wrapper.<String>getCollection(DISABLE));
}
// Failure action
- if (optionsMap.containsKey(FAILURE_ACTION)) {
- options.failureAction = FailureAction.valueOf((String)optionsMap.get(FAILURE_ACTION));
- }
- options.pipelineLevel = (Integer)optionsMap.get(PIPELINE_LEVEL);
- options.pipelineExecId = (Integer)optionsMap.get(PIPELINE_EXECID);
- options.queueLevel = (Integer)optionsMap.get(QUEUE_LEVEL);
+ options.failureAction = FailureAction.valueOf(wrapper.getString(FAILURE_ACTION, options.failureAction.toString()));
+ options.pipelineLevel = wrapper.getInt(PIPELINE_LEVEL, options.pipelineLevel);
+ options.pipelineExecId = wrapper.getInt(PIPELINE_EXECID, options.pipelineExecId);
+ options.queueLevel = wrapper.getInt(QUEUE_LEVEL, options.queueLevel);
+
// Success emails
- if (optionsMap.containsKey(SUCCESS_EMAILS)) {
- options.setSuccessEmails((List<String>)optionsMap.get(SUCCESS_EMAILS));
- }
- // Failure emails
- if (optionsMap.containsKey(FAILURE_EMAILS)) {
- options.setFailureEmails((List<String>)optionsMap.get(FAILURE_EMAILS));
- }
+ options.setSuccessEmails(wrapper.<String>getList(SUCCESS_EMAILS, Collections.<String>emptyList()));
+ options.setFailureEmails(wrapper.<String>getList(FAILURE_EMAILS, Collections.<String>emptyList()));
- if (optionsMap.containsKey(SUCCESS_EMAILS_OVERRIDE)) {
- options.setSuccessEmailsOverridden((Boolean)optionsMap.get(SUCCESS_EMAILS_OVERRIDE));
- }
-
- if (optionsMap.containsKey(FAILURE_EMAILS_OVERRIDE)) {
- options.setFailureEmailsOverridden((Boolean)optionsMap.get(FAILURE_EMAILS_OVERRIDE));
- }
+ options.setSuccessEmailsOverridden(wrapper.getBool(SUCCESS_EMAILS_OVERRIDE, false));
+ options.setFailureEmailsOverridden(wrapper.getBool(FAILURE_EMAILS_OVERRIDE, false));
return options;
}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 29a93a3..0faf4ab 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -826,7 +826,7 @@ public class ExecutorManager {
Pair<ExecutionReference, ExecutableFlow> refPair = this.runningFlows.get(execId);
if (refPair == null) {
- throw new ExecutorManagerException("No running flow found with the execution id.");
+ throw new ExecutorManagerException("No running flow found with the execution id. Removing " + execId);
}
ExecutionReference ref = refPair.getFirst();
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index d354e65..d427ea5 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -379,7 +379,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
// if the main flow is not the parent, then we'll create a composite key for flowID
if (flow != node.getParentFlow()) {
- flowId = flow.getId() + "+" + node.getParentFlow().getPrintableId("+");
+ flowId = node.getParentFlow().getNestedId();
}
QueryRunner runner = createQueryRunner();
@@ -426,6 +426,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
node.getStatus().getNumVal(),
outputParam,
node.getExecutableFlow().getExecutionId(),
+ node.getParentFlow().getNestedId(),
node.getId(),
node.getAttempt());
} catch (SQLException e) {
src/java/azkaban/utils/TypedMapWrapper.java 141(+141 -0)
diff --git a/src/java/azkaban/utils/TypedMapWrapper.java b/src/java/azkaban/utils/TypedMapWrapper.java
new file mode 100644
index 0000000..cce512b
--- /dev/null
+++ b/src/java/azkaban/utils/TypedMapWrapper.java
@@ -0,0 +1,141 @@
+package azkaban.utils;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+public class TypedMapWrapper<K, V> {
+ private Map<K,V> map;
+ public TypedMapWrapper(Map<K, V> map) {
+ this.map = map;
+ }
+
+ public String getString(K key) {
+ return getString(key, null);
+ }
+
+ public String getString(K key, String defaultVal) {
+ Object obj = map.get(key);
+ if (obj == null) {
+ return defaultVal;
+ }
+ if (obj instanceof String) {
+ return (String)obj;
+ }
+
+ return obj.toString();
+ }
+
+ public Boolean getBool(K key, Boolean defaultVal) {
+ Object obj = map.get(key);
+ if (obj == null) {
+ return defaultVal;
+ }
+
+ return (Boolean)obj;
+ }
+
+ public Integer getInt(K key) {
+ return getInt(key, -1);
+ }
+
+ public Integer getInt(K key, Integer defaultVal) {
+ Object obj = map.get(key);
+ if (obj == null) {
+ return defaultVal;
+ }
+ if (obj instanceof Integer) {
+ return (Integer)obj;
+ }
+ else if (obj instanceof String) {
+ return Integer.valueOf((String)obj);
+ }
+ else {
+ return defaultVal;
+ }
+ }
+
+ public Long getLong(K key) {
+ return getLong(key, -1l);
+ }
+
+ public Long getLong(K key, Long defaultVal) {
+ Object obj = map.get(key);
+ if (obj == null) {
+ return defaultVal;
+ }
+ if (obj instanceof Long) {
+ return (Long)obj;
+ }
+ else if (obj instanceof Integer) {
+ return Long.valueOf((Integer)obj);
+ }
+ else if (obj instanceof String) {
+ return Long.valueOf((String)obj);
+ }
+ else {
+ return defaultVal;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public Collection<String> getStringCollection(K key) {
+ Object obj = map.get(key);
+ return (Collection<String>)obj;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Collection<String> getStringCollection(K key, Collection<String> defaultVal) {
+ Object obj = map.get(key);
+ if (obj == null) {
+ return defaultVal;
+ }
+
+ return (Collection<String>)obj;
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public <C> Collection<C> getCollection(K key) {
+ Object obj = map.get(key);
+ if (obj instanceof Collection) {
+ return (Collection<C>)obj;
+ }
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <L> List<L> getList(K key) {
+ Object obj = map.get(key);
+ if (obj instanceof List) {
+ return (List<L>)obj;
+ }
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <L> List<L> getList(K key, List<L> defaultVal) {
+ Object obj = map.get(key);
+ if (obj instanceof List) {
+ return (List<L>)obj;
+ }
+ return defaultVal;
+ }
+
+ public Object getObject(K key) {
+ return map.get(key);
+ }
+
+ public Map<K, V> getMap() {
+ return map;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <S, T> Map<S,T> getMap(K key) {
+ return (Map<S,T>)map.get(key);
+ }
+
+ public boolean containsKey(K key) {
+ return map.containsKey(key);
+ }
+}
src/java/azkaban/webapp/servlet/ExecutorServlet.java 171(+130 -41)
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 4cf171a..785adbf 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
@@ -28,6 +29,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionAttempt;
import azkaban.executor.ExecutionOptions;
@@ -690,51 +692,58 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ret.put("resume", e.getMessage());
}
}
-
- private void ajaxFetchExecutableFlowUpdate(HttpServletRequest req,
- HttpServletResponse resp, HashMap<String, Object> ret, User user,
- ExecutableFlow exFlow) throws ServletException {
- Long lastUpdateTime = Long.parseLong(getParam(req, "lastUpdateTime"));
- System.out.println("Fetching " + exFlow.getExecutionId());
-
- Project project = getProjectAjaxByPermission(ret,
- exFlow.getProjectId(), user, Type.READ);
- if (project == null) {
- return;
- }
-
+
+ private long fillUpdateExecutableFlowInfo(ExecutableFlowBase flow, long lastUpdateTime, HashMap<String, Object> ret) {
// Just update the nodes and flow states
ArrayList<Map<String, Object>> nodeList = new ArrayList<Map<String, Object>>();
- for (ExecutableNode node : exFlow.getExecutableNodes()) {
- if (node.getUpdateTime() <= lastUpdateTime) {
+ HashMap<String, Map<String,Object>> nodeMap = new HashMap<String, Map<String,Object>>();
+
+ long updateTime = flow.getUpdateTime();
+ for (ExecutableNode node : flow.getExecutableNodes()) {
+ HashMap<String, Object> nodeObj = null;
+ if (node instanceof ExecutableFlowBase) {
+ nodeObj = new HashMap<String, Object>();
+ long subUpdateTime = fillUpdateExecutableFlowInfo((ExecutableFlowBase)node, lastUpdateTime, nodeObj);
+ updateTime = Math.max(updateTime, subUpdateTime);
+ if (updateTime <= lastUpdateTime) {
+ continue;
+ }
+ }
+ else if (node.getUpdateTime() <= lastUpdateTime){
continue;
}
-
- HashMap<String, Object> nodeObj = new HashMap<String, Object>();
- nodeObj.put("id", node.getId());
- nodeObj.put("status", node.getStatus());
- nodeObj.put("startTime", node.getStartTime());
- nodeObj.put("endTime", node.getEndTime());
- nodeObj.put("attempt", node.getAttempt());
-
- if (node.getAttempt() > 0) {
- nodeObj.put("pastAttempts", node.getAttemptObjects());
+ else {
+ nodeObj = new HashMap<String, Object>();
+ updateTime = Math.max(updateTime, node.getUpdateTime());
+
+ nodeObj.put("id", node.getId());
+ nodeObj.put("status", node.getStatus());
+ nodeObj.put("startTime", node.getStartTime());
+ nodeObj.put("endTime", node.getEndTime());
+ nodeObj.put("updateTime", node.getUpdateTime());
+ nodeObj.put("attempt", node.getAttempt());
+
+ if (node.getAttempt() > 0) {
+ nodeObj.put("pastAttempts", node.getAttemptObjects());
+ }
}
-
+
+ nodeMap.put(node.getId(), nodeObj);
nodeList.add(nodeObj);
}
ret.put("nodes", nodeList);
- ret.put("status", exFlow.getStatus().toString());
- ret.put("startTime", exFlow.getStartTime());
- ret.put("endTime", exFlow.getEndTime());
- ret.put("submitTime", exFlow.getSubmitTime());
- ret.put("updateTime", exFlow.getUpdateTime());
+ ret.put("status", flow.getStatus().toString());
+ ret.put("startTime", flow.getStartTime());
+ ret.put("endTime", flow.getEndTime());
+ ret.put("updateTime", updateTime);
+ return updateTime;
}
-
- private void ajaxFetchExecutableFlow(HttpServletRequest req,
+
+ private void ajaxFetchExecutableFlowUpdate(HttpServletRequest req,
HttpServletResponse resp, HashMap<String, Object> ret, User user,
ExecutableFlow exFlow) throws ServletException {
+ Long lastUpdateTime = Long.parseLong(getParam(req, "lastUpdateTime"));
System.out.println("Fetching " + exFlow.getExecutionId());
Project project = getProjectAjaxByPermission(ret,
@@ -742,16 +751,27 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
if (project == null) {
return;
}
+
+ fillUpdateExecutableFlowInfo(exFlow, lastUpdateTime, ret);
+ }
+ private long fillExecutableFlowInfo(ExecutableFlowBase flow, HashMap<String, Object> ret) {
+ long updateTime = flow.getUpdateTime();
+
ArrayList<Map<String, Object>> nodeList = new ArrayList<Map<String, Object>>();
ArrayList<Map<String, Object>> edgeList = new ArrayList<Map<String, Object>>();
- for (ExecutableNode node : exFlow.getExecutableNodes()) {
+
+ ArrayList<String> executorQueue = new ArrayList<String>();
+ executorQueue.addAll(flow.getStartNodes());
+
+ for (ExecutableNode node : flow.getExecutableNodes()) {
HashMap<String, Object> nodeObj = new HashMap<String, Object>();
nodeObj.put("id", node.getId());
nodeObj.put("status", node.getStatus());
nodeObj.put("startTime", node.getStartTime());
nodeObj.put("endTime", node.getEndTime());
-
+ nodeObj.put("type", node.getType());
+
// Add past attempts
if (node.getPastAttemptList() != null) {
ArrayList<Object> pastAttempts = new ArrayList<Object>();
@@ -760,9 +780,9 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
nodeObj.put("pastAttempts", pastAttempts);
}
-
+
nodeList.add(nodeObj);
-
+
// Add edges
for (String out : node.getOutNodes()) {
HashMap<String, Object> edgeObj = new HashMap<String, Object>();
@@ -770,15 +790,84 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
edgeObj.put("target", out);
edgeList.add(edgeObj);
}
+
+ // If it's an embedded flow, add the embedded flow info
+ if (node instanceof ExecutableFlowBase) {
+ long subUpdateTime = fillExecutableFlowInfo((ExecutableFlowBase)node, nodeObj);
+ updateTime = Math.max(updateTime, subUpdateTime);
+ }
+ else {
+ nodeObj.put("updateTime", updateTime);
+ }
}
-
+
ret.put("nodes", nodeList);
ret.put("edges", edgeList);
- ret.put("status", exFlow.getStatus().toString());
- ret.put("startTime", exFlow.getStartTime());
- ret.put("endTime", exFlow.getEndTime());
+ ret.put("status", flow.getStatus().toString());
+ ret.put("startTime", flow.getStartTime());
+ ret.put("endTime", flow.getEndTime());
+ ret.put("updateTime", updateTime);
+ return updateTime;
+ }
+
+ private void ajaxFetchExecutableFlow(HttpServletRequest req,
+ HttpServletResponse resp, HashMap<String, Object> ret, User user,
+ ExecutableFlow exFlow) throws ServletException {
+ System.out.println("Fetching " + exFlow.getExecutionId());
+
+ Project project = getProjectAjaxByPermission(ret,
+ exFlow.getProjectId(), user, Type.READ);
+ if (project == null) {
+ return;
+ }
+
+ fillExecutableFlowInfo(exFlow, ret);
ret.put("submitTime", exFlow.getSubmitTime());
ret.put("submitUser", exFlow.getSubmitUser());
+//
+//
+// ArrayList<Map<String, Object>> nodeList = new ArrayList<Map<String, Object>>();
+// ArrayList<Map<String, Object>> edgeList = new ArrayList<Map<String, Object>>();
+// for (ExecutableNode node : exFlow.getExecutableNodes()) {
+// HashMap<String, Object> nodeObj = new HashMap<String, Object>();
+// nodeObj.put("id", node.getId());
+// nodeObj.put("status", node.getStatus());
+// nodeObj.put("startTime", node.getStartTime());
+// nodeObj.put("endTime", node.getEndTime());
+// nodeObj.put("type", node.getType());
+//
+// // Add past attempts
+// if (node.getPastAttemptList() != null) {
+// ArrayList<Object> pastAttempts = new ArrayList<Object>();
+// for (ExecutionAttempt attempt : node.getPastAttemptList()) {
+// pastAttempts.add(attempt.toObject());
+// }
+// nodeObj.put("pastAttempts", pastAttempts);
+// }
+//
+// nodeList.add(nodeObj);
+//
+// // Add edges
+// for (String out : node.getOutNodes()) {
+// HashMap<String, Object> edgeObj = new HashMap<String, Object>();
+// edgeObj.put("from", node.getId());
+// edgeObj.put("target", out);
+// edgeList.add(edgeObj);
+// }
+//
+// // If it's an embedded flow, add the embedded flow info
+// if (node instanceof ExecutableFlowBase) {
+//
+// }
+// }
+//
+// ret.put("nodes", nodeList);
+// ret.put("edges", edgeList);
+// ret.put("status", exFlow.getStatus().toString());
+// ret.put("startTime", exFlow.getStartTime());
+// ret.put("endTime", exFlow.getEndTime());
+// ret.put("submitTime", exFlow.getSubmitTime());
+// ret.put("submitUser", exFlow.getSubmitUser());
}
private void ajaxAttemptExecuteFlow(HttpServletRequest req,
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 4abde57..2a4458d 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -28,6 +28,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -511,7 +512,46 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
private void ajaxFetchFlowGraph(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
String flowId = getParam(req, "flow");
- fillFlowInfo(project, flowId, ret);
+ fillFlowInfo2(project, flowId, ret);
+ }
+
+ private void fillFlowInfo2(Project project, String flowId, HashMap<String, Object> ret) {
+ Flow flow = project.getFlow(flowId);
+
+ ArrayList<Map<String, Object>> nodeList = new ArrayList<Map<String, Object>>();
+ for (Node node: flow.getNodes()) {
+ HashMap<String, Object> nodeObj = new HashMap<String, Object>();
+ nodeObj.put("id", node.getId());
+ nodeObj.put("type", node.getType());
+ if (node.getEmbeddedFlowId() != null) {
+ nodeObj.put("flowId", node.getEmbeddedFlowId());
+// HashMap<String, Object> embeddedNodeObj = new HashMap<String, Object>();
+// fillFlowInfo2(project, node.getEmbeddedFlowId(), embeddedNodeObj);
+// nodeObj.put("flowData", embeddedNodeObj);
+ }
+
+ nodeList.add(nodeObj);
+ Set<Edge> inEdges = flow.getInEdges(node.getId());
+ if (inEdges != null && !inEdges.isEmpty()) {
+ ArrayList<String> inEdgesList = new ArrayList<String>();
+ for (Edge edge: inEdges) {
+ inEdgesList.add(edge.getSourceId());
+ }
+ Collections.sort(inEdgesList);
+ nodeObj.put("in", inEdgesList);
+ }
+ }
+
+ Collections.sort(nodeList, new Comparator<Map<String, Object>>() {
+ @Override
+ public int compare(Map<String, Object> o1, Map<String, Object> o2) {
+ String id = (String)o1.get("id");
+ return id.compareTo((String)o2.get("id"));
+ }
+ });
+
+ ret.put("flow", flowId);
+ ret.put("nodes", nodeList);
}
private void fillFlowInfo(Project project, String flowId, HashMap<String, Object> ret) {
@@ -584,7 +624,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
if (node.getType().equals("flow")) {
if (node.getEmbeddedFlowId() != null) {
HashMap<String, Object> flowMap = new HashMap<String, Object>();
- fillFlowInfo(project, node.getEmbeddedFlowId(), flowMap);
+ fillFlowInfo2(project, node.getEmbeddedFlowId(), flowMap);
ret.put("flowData", flowMap);
}
}
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
index 7a03448..0bca980 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -31,6 +31,7 @@
<script type="text/javascript" src="${context}/js/azkaban.job.status.utils.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.layout.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.svg.exflow.helper.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.flow.job.view.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.svg.graph.view.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.exflow.view.js"></script>
diff --git a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
index 82101cb..67cde66 100644
--- a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
@@ -31,7 +31,7 @@
<script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.layout.js"></script>
<script type="text/javascript" src="${context}/js/svgNavigate.js"></script>
- <script type="text/javascript" src="${context}/js/azkaban.svg.graph.helper.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.svg.flow.loader.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.svg.graph.view.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.flow.extended.view.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.flow.job.view.js"></script>
diff --git a/src/java/azkaban/webapp/servlet/velocity/projectpage.vm b/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
index 8a608f6..c57c2f7 100644
--- a/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
@@ -27,6 +27,7 @@
<script type="text/javascript" src="${context}/js/jquery.simplemodal-1.4.4.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.ajax.utils.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.svg.graph.view.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.project.view.js"></script>
<link rel="stylesheet" type="text/css" href="${context}/css/jquery-ui-1.10.1.custom.css" />
src/web/js/azkaban.exflow.view.js 89(+6 -83)
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index 388c42e..8e3c8d0 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -627,54 +627,6 @@ var logUpdaterFunction = function() {
}
}
-var exNodeClickCallback = function(event) {
- console.log("Node clicked callback");
- var jobId = event.currentTarget.jobid;
- var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowId + "&job=" + jobId;
-
- var menu = [
- {title: "Open Job...", callback: function() {window.location.href=requestURL;}},
- {title: "Open Job in New Window...", callback: function() {window.open(requestURL);}},
- {break: 1},
- {title: "Center Job", callback: function() {graphModel.trigger("centerNode", jobId)}}
- ];
-
- contextMenuView.show(event, menu);
-}
-
-var exJobClickCallback = function(event) {
- console.log("Node clicked callback");
- var jobId = event.currentTarget.jobid;
- var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowId + "&job=" + jobId;
-
- var menu = [
- {title: "Open Job...", callback: function() {window.location.href=requestURL;}},
- {title: "Open Job in New Window...", callback: function() {window.open(requestURL);}},
- {break: 1},
- {title: "Center Job", callback: function() {graphModel.trigger("centerNode", jobId)}}
- ];
-
- contextMenuView.show(event, menu);
-}
-
-var exEdgeClickCallback = function(event) {
- console.log("Edge clicked callback");
-}
-
-var exGraphClickCallback = function(event) {
- console.log("Graph clicked callback");
- var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowId;
-
- var menu = [
- {title: "Open Flow...", callback: function() {window.location.href=requestURL;}},
- {title: "Open Flow in New Window...", callback: function() {window.open(requestURL);}},
- {break: 1},
- {title: "Center Graph", callback: function() {graphModel.trigger("resetPanZoom");}}
- ];
-
- contextMenuView.show(event, menu);
-}
-
var attemptRightClick = function(event) {
var target = event.currentTarget;
var job = target.job;
@@ -699,8 +651,8 @@ $(function() {
logModel = new azkaban.LogModel();
flowTabView = new azkaban.FlowTabView({el:$( '#headertabs'), model: graphModel});
- mainSvgGraphView = new azkaban.SvgGraphView({el:$('#svgDiv'), model: graphModel, rightClick: { "node": exNodeClickCallback, "edge": exEdgeClickCallback, "graph": exGraphClickCallback }});
- jobsListView = new azkaban.JobListView({el:$('#jobList'), model: graphModel, contextMenuCallback: exJobClickCallback});
+ mainSvgGraphView = new azkaban.SvgGraphView({el:$('#svgDiv'), model: graphModel, rightClick: { "node": nodeClickCallback, "edge": edgeClickCallback, "graph": graphClickCallback }});
+ jobsListView = new azkaban.JobListView({el:$('#jobList'), model: graphModel, contextMenuCallback: nodeClickCallback});
flowLogView = new azkaban.FlowLogView({el:$('#flowLogView'), model: logModel});
statusView = new azkaban.StatusView({el:$('#flow-status'), model: graphModel});
@@ -712,40 +664,11 @@ $(function() {
requestURL,
{"execid": execId, "ajax":"fetchexecflow"},
function(data) {
- console.log("data fetched");
- graphModel.set({data: data});
- graphModel.set({disabled: {}});
+ console.log("data fetched");
+ createModelFromAjaxCall(data, graphModel);
graphModel.trigger("change:graph");
-
- updateTime = Math.max(updateTime, data.submitTime);
- updateTime = Math.max(updateTime, data.startTime);
- updateTime = Math.max(updateTime, data.endTime);
-
- var nodeMap = {};
- for (var i = 0; i < data.nodes.length; ++i) {
- var node = data.nodes[i];
- nodeMap[node.id] = node;
- updateTime = Math.max(updateTime, node.startTime);
- updateTime = Math.max(updateTime, node.endTime);
- }
- for (var i = 0; i < data.edges.length; ++i) {
- var edge = data.edges[i];
-
- if (!nodeMap[edge.target].in) {
- nodeMap[edge.target].in = {};
- }
- var targetInMap = nodeMap[edge.target].in;
- targetInMap[edge.from] = nodeMap[edge.from];
-
- if (!nodeMap[edge.from].out) {
- nodeMap[edge.from].out = {};
- }
- var sourceOutMap = nodeMap[edge.from].out;
- sourceOutMap[edge.target] = nodeMap[edge.target];
- }
-
- graphModel.set({nodeMap: nodeMap});
-
+ updateTime = data.updateTime;
+
if (window.location.hash) {
var hash = window.location.hash;
if (hash == "#jobslist") {
diff --git a/src/web/js/azkaban.flow.extended.view.js b/src/web/js/azkaban.flow.extended.view.js
index 21985b3..aa38d34 100644
--- a/src/web/js/azkaban.flow.extended.view.js
+++ b/src/web/js/azkaban.flow.extended.view.js
@@ -6,9 +6,7 @@ azkaban.FlowExtendedViewPanel = Backbone.View.extend({
//this.model.bind('change:flowinfo', this.changeFlowInfo, this);
$(this.el).show();
$(this.el).draggable({cancel: ".dataContent", containment: "document"});
-
- this.extendedViewPanels = {};
- this.extendedDataModels = {};
+
this.render();
$(this.el).hide();
},
@@ -52,10 +50,7 @@ azkaban.FlowExtendedViewPanel = Backbone.View.extend({
$(svgDataFlow).append(svgGraph);
$(svgDataFlow).resizable();
- this.innerGraphModel = new azkaban.GraphModel();
- this.innerGraphModel.set({"data": this.model.get("flow")});
-
- this.graphView = new azkaban.SvgGraphView({el: svgDataFlow, model: this.innerGraphModel, render: true, rightClick: { "node": nodeClickCallback, "graph": graphClickCallback }})
+ this.graphView = new azkaban.SvgGraphView({el: svgDataFlow, model: this.model, render: true, rightClick: { "node": nodeClickCallback, "graph": graphClickCallback }})
}
else {
$(this.el).find(".dataFlow").hide();
src/web/js/azkaban.flow.view.js 3(+2 -1)
diff --git a/src/web/js/azkaban.flow.view.js b/src/web/js/azkaban.flow.view.js
index 73945f0..010291a 100644
--- a/src/web/js/azkaban.flow.view.js
+++ b/src/web/js/azkaban.flow.view.js
@@ -298,6 +298,7 @@ $(function() {
// Set up the Flow options view. Create a new one every time :p
$('#executebtn').click( function() {
+ closeAllSubDisplays();
var data = graphModel.get("data");
var nodes = data.nodes;
@@ -314,7 +315,7 @@ $(function() {
requestURL,
{"project": projectName, "ajax":"fetchflowgraph", "flow":flowId},
function(data) {
- createModelFromAjaxCall(data, graphModel);
+ parseFlowData(data, graphModel);
graphModel.trigger("change:graph");
// Handle the hash changes here so the graph finishes rendering first.
src/web/js/azkaban.layout.js 85(+72 -13)
diff --git a/src/web/js/azkaban.layout.js b/src/web/js/azkaban.layout.js
index 8ebe486..07debec 100644
--- a/src/web/js/azkaban.layout.js
+++ b/src/web/js/azkaban.layout.js
@@ -4,16 +4,59 @@ var degreeRatio = 1/8;
var maxHeight = 200;
var cornerGap = 10;
-function layoutGraph(nodes, edges, hmargin) {
- var startLayer = [];
- var numLayer = 0;
- var nodeMap = {};
-
+var idSort = function(a, b) {
+ if ( a.id < b.id ) {
+ return -1;
+ }
+ else if ( a.id > b.id ) {
+ return 1;
+ }
+ else {
+ return 0;
+ }
+}
+
+function prepareLayout(nodes, hmargin, layers, nodeMap) {
var maxLayer = 0;
- var layers = {};
+ var numLayer = 0;
+ var nodeQueue = new Array();
+ // Find start layers first
+ for (var i=0; i < nodes.length; ++i) {
+ var node = nodes[i];
+ if (node.inNodes) {
+ // We sort here. Why? To keep the node drawing consistent
+ node.in.sort(idSort);
+ }
+ else {
+ // We sort here. Why? To keep it up and running.
+ nodeQueue.push(node);
+ }
+ }
+ // Sort here. To keep the node drawing consistent
+ nodes.sort(idSort);
- if (!hmargin) {
- hmargin = 8;
+ // calculate level
+ // breath first search the sucker
+ var index = 0;
+ while(index < nodeQueue.length) {
+ var node = nodeQueue[index];
+ if (node.inNodes) {
+ var level = 0;
+ for (var key in node.inNodes) {
+ level = Math.max(level, node.inNodes[key].level);
+ }
+ node.level = level + 1;
+ }
+ else {
+ node.level = 0;
+ }
+
+ if (node.outNodes) {
+ for (var key in node.outNodes) {
+ nodeQueue.push(node.outNodes[key]);
+ }
+ }
+ index++;
}
// Assign to layers
@@ -33,13 +76,30 @@ function layoutGraph(nodes, edges, hmargin) {
layers[node.level].push(node);
}
+ layers.numLayer = numLayer;
+ layers.maxLayer = maxLayer;
+}
+
+function layoutGraph(nodes, edges, hmargin) {
+ var startLayer = [];
+
+ var nodeMap = {};
+ var layers = {};
+
+ if (!hmargin) {
+ hmargin = 8;
+ }
+
+ prepareLayout(nodes, hmargin, layers, nodeMap);
+ var maxLayer = layers.maxLayer;
+ var numLayer = layers.numLayer;
+
// Create dummy nodes
var edgeDummies = {};
-
for (var i=0; i < edges.length; ++i ) {
var edge = edges[i];
var src = edges[i].from;
- var dest = edges[i].target;
+ var dest = edges[i].to;
var edgeId = src + ">>" + dest;
@@ -96,7 +156,6 @@ function layoutGraph(nodes, edges, hmargin) {
spreadLayerSmart(layers[i]);
}
-
// Space it vertically
spaceVertically(layers, maxLayer);
@@ -107,12 +166,12 @@ function layoutGraph(nodes, edges, hmargin) {
node.x = layerNode.x;
node.y = layerNode.y;
}
-
+
// Dummy node for more points.
for (var i = 0; i < edges.length; ++i) {
var edge = edges[i];
var src = edges[i].from;
- var dest = edges[i].target;
+ var dest = edges[i].to;
var edgeId = src + ">>" + dest;
src/web/js/azkaban.svg.exflow.helper.js 200(+200 -0)
diff --git a/src/web/js/azkaban.svg.exflow.helper.js b/src/web/js/azkaban.svg.exflow.helper.js
new file mode 100644
index 0000000..6604020
--- /dev/null
+++ b/src/web/js/azkaban.svg.exflow.helper.js
@@ -0,0 +1,200 @@
+var extendedViewPanels = {};
+var extendedDataModels = {};
+var openJobDisplayCallback = function(nodeId, flowId, evt) {
+ console.log("Open up data");
+
+ var nodeInfoPanelID = flowId + ":" + nodeId + "-info";
+ if ($("#" + nodeInfoPanelID).length) {
+ $("#flowInfoBase").before(cloneStuff);
+ extendedViewPanels[nodeInfoPanelID].showExtendedView(evt);
+ return;
+ }
+
+ var cloneStuff = $("#flowInfoBase").clone();
+ $(cloneStuff).attr("id", nodeInfoPanelID);
+
+
+
+ /*
+ $("#flowInfoBase").before(cloneStuff);
+ var requestURL = contextURL + "/manager";
+
+ $.get(
+ requestURL,
+ {"project": projectName, "ajax":"fetchflownodedata", "flow":flowId, "node": nodeId},
+ function(data) {
+ var graphModel = new azkaban.GraphModel();
+ graphModel.set({id: data.id, flow: data.flowData, type: data.type, props: data.props});
+
+ var flowData = data.flowData;
+ if (flowData) {
+ createModelFromAjaxCall(flowData, graphModel);
+ }
+
+ var backboneView = new azkaban.FlowExtendedViewPanel({el:cloneStuff, model: graphModel});
+ extendedViewPanels[nodeInfoPanelID] = backboneView;
+ extendedDataModels[nodeInfoPanelID] = graphModel;
+ backboneView.showExtendedView(evt);
+ },
+ "json"
+ );
+ */
+}
+
+var extendedDataModels = {};
+var createModelFromAjaxCall = function(data, model) {
+ var nodes = {};
+ for (var i=0; i < data.nodes.length; ++i) {
+ var graphModel = new azkaban.GraphModel();
+ var node = data.nodes[i];
+ nodes[node.id] = node;
+ }
+
+ for (var i=0; i < data.edges.length; ++i) {
+ var edge = data.edges[i];
+ var fromNode = nodes[edge.from];
+ var toNode = nodes[edge.target];
+
+ if (!fromNode.outNodes) {
+ fromNode.outNodes = {};
+ }
+ fromNode.outNodes[toNode.id] = toNode;
+
+ if (!toNode.inNodes) {
+ toNode.inNodes = {};
+ }
+ toNode.inNodes[fromNode.id] = fromNode;
+ }
+
+ var nodeQueue = new Array();
+ for (var key in nodes) {
+ if (!nodes[key].inNodes) {
+ nodeQueue.push(nodes[key]);
+ }
+ }
+
+ // calculate level
+ // breath first search the sucker
+ var index = 0;
+ while(index < nodeQueue.length) {
+ var node = nodeQueue[index];
+ if (node.inNodes) {
+ var level = 0;
+ for (var key in node.inNodes) {
+ level = Math.max(level, node.inNodes[key].level);
+ }
+ node.level = level + 1;
+ }
+ else {
+ node.level = 0;
+ }
+
+ if (node.outNodes) {
+ for (var key in node.outNodes) {
+ nodeQueue.push(node.outNodes[key]);
+ }
+ }
+ index++;
+ }
+
+ for (var key in nodes) {
+ var node = nodes[key];
+
+ if (node.type == "flow") {
+ var graphModel = new azkaban.GraphModel();
+ createModelFromAjaxCall(node, graphModel);
+ extendedDataModels["test"] = graphModel;
+ }
+ }
+
+ console.log("data fetched");
+ model.set({data: data});
+ model.set({nodes: nodes});
+ model.set({disabled: {}});
+}
+
+var nodeClickCallback = function(event, model, type) {
+ console.log("Node clicked callback");
+ var jobId = event.currentTarget.jobid;
+ var flowId = model.get("flowId");
+ var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowId + "&job=" + jobId;
+
+ if (event.currentTarget.jobtype == "flow") {
+ var flowRequestURL = contextURL + "/manager?project=" + projectName + "&flow=" + event.currentTarget.flowId;
+ menu = [
+ {title: "View Flow...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+ {break: 1},
+ {title: "Open Flow...", callback: function() {window.location.href=flowRequestURL;}},
+ {title: "Open Flow in New Window...", callback: function() {window.open(flowRequestURL);}},
+ {break: 1},
+ {title: "Open Properties...", callback: function() {window.location.href=requestURL;}},
+ {title: "Open Properties in New Window...", callback: function() {window.open(requestURL);}},
+ {break: 1},
+ {title: "Center Flow", callback: function() {model.trigger("centerNode", jobId)}}
+ ];
+ }
+ else {
+ menu = [
+ {title: "View Job...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+ {break: 1},
+ {title: "Open Job...", callback: function() {window.location.href=requestURL;}},
+ {title: "Open Job in New Window...", callback: function() {window.open(requestURL);}},
+ {break: 1},
+ {title: "Center Job", callback: function() {model.trigger("centerNode", jobId)}}
+ ];
+ }
+ contextMenuView.show(event, menu);
+}
+
+var jobClickCallback = function(event, model) {
+ console.log("Node clicked callback");
+ var jobId = event.currentTarget.jobid;
+ var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowId + "&job=" + jobId;
+
+ var menu;
+ if (event.currentTarget.jobtype == "flow") {
+ var flowRequestURL = contextURL + "/manager?project=" + projectName + "&flow=" + event.currentTarget.flowId;
+ menu = [
+ {title: "View Flow...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+ {break: 1},
+ {title: "Open Flow...", callback: function() {window.location.href=flowRequestURL;}},
+ {title: "Open Flow in New Window...", callback: function() {window.open(flowRequestURL);}},
+ {break: 1},
+ {title: "Open Properties...", callback: function() {window.location.href=requestURL;}},
+ {title: "Open Properties in New Window...", callback: function() {window.open(requestURL);}},
+ {break: 1},
+ {title: "Center Flow", callback: function() {model.trigger("centerNode", jobId)}}
+ ];
+ }
+ else {
+ menu = [
+ {title: "View Job...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+ {break: 1},
+ {title: "Open Job...", callback: function() {window.location.href=requestURL;}},
+ {title: "Open Job in New Window...", callback: function() {window.open(requestURL);}},
+ {break: 1},
+ {title: "Center Job", callback: function() {graphModel.trigger("centerNode", jobId)}}
+ ];
+ }
+ contextMenuView.show(event, menu);
+}
+
+var edgeClickCallback = function(event, model) {
+ console.log("Edge clicked callback");
+}
+
+var graphClickCallback = function(event, model) {
+ console.log("Graph clicked callback");
+ var jobId = event.currentTarget.jobid;
+ var flowId = model.get("flowId");
+ var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowId;
+
+ var menu = [
+ {title: "Open Flow...", callback: function() {window.location.href=requestURL;}},
+ {title: "Open Flow in New Window...", callback: function() {window.open(requestURL);}},
+ {break: 1},
+ {title: "Center Graph", callback: function() {model.trigger("resetPanZoom");}}
+ ];
+
+ contextMenuView.show(event, menu);
+}
src/web/js/azkaban.svg.flow.loader.js 234(+234 -0)
diff --git a/src/web/js/azkaban.svg.flow.loader.js b/src/web/js/azkaban.svg.flow.loader.js
new file mode 100644
index 0000000..801877c
--- /dev/null
+++ b/src/web/js/azkaban.svg.flow.loader.js
@@ -0,0 +1,234 @@
+var extendedViewPanels = {};
+var extendedDataModels = {};
+var openJobDisplayCallback = function(nodeId, flowId, evt) {
+ console.log("Open up data");
+ var target = evt.currentTarget;
+ var node = target.nodeobj;
+
+ // If target panel exists, than we display and skip.
+ var targetPanel = node.panel;
+ if (targetPanel) {
+ $("#flowInfoBase").before(targetPanel);
+ targetPanel.showExtendedView(evt);
+ }
+ else {
+ var targetModel = node.dataModel;
+ var flowId = flowId;
+
+ if (!targetModel) {
+ var requestURL = contextURL + "/manager";
+ var newParentPath = node.parentPath ? node.parentPath + ":" + flowId : flowId;
+ node.parentPath = newParentPath;
+
+ $.get(
+ requestURL,
+ {"project": projectName, "ajax":"fetchflownodedata", "flow":flowId, "node": node.id},
+ function(data) {
+ var graphModel = new azkaban.GraphModel();
+ graphModel.set({id: data.id, flow: data.flowData, type: data.type, props: data.props});
+
+ var flowData = data.flowData;
+ if (flowData) {
+ parseFlowData(flowData, graphModel, newParentPath);
+ }
+
+ node.dataModel = graphModel;
+ createNewPanel(node, graphModel, evt);
+ },
+ "json"
+ );
+ }
+ else {
+ createNewPanel(node, targetModel, evt);
+ }
+ }
+
+ /*
+ $("#flowInfoBase").before(cloneStuff);
+ var requestURL = contextURL + "/manager";
+
+ $.get(
+ requestURL,
+ {"project": projectName, "ajax":"fetchflownodedata", "flow":flowId, "node": nodeId},
+ function(data) {
+ var graphModel = new azkaban.GraphModel();
+ graphModel.set({id: data.id, flow: data.flowData, type: data.type, props: data.props});
+
+ var flowData = data.flowData;
+ if (flowData) {
+ createModelFromAjaxCall(flowData, graphModel);
+ }
+
+ var backboneView = new azkaban.FlowExtendedViewPanel({el:cloneStuff, model: graphModel});
+ extendedViewPanels[nodeInfoPanelID] = backboneView;
+ extendedDataModels[nodeInfoPanelID] = graphModel;
+ backboneView.showExtendedView(evt);
+ },
+ "json"
+ );
+ */
+}
+
+var createNewPanel = function(node, model, evt) {
+ var parentPath = node.parentPath;
+
+ var nodeInfoPanelID = parentPath ? parentPath + ":" + node.id + "-info" : node.id + "-info";
+ var cloneStuff = $("#flowInfoBase").clone();
+ cloneStuff.nodeobj = node;
+ $(cloneStuff).attr("id", nodeInfoPanelID);
+ $("#flowInfoBase").before(cloneStuff);
+
+ var backboneView = new azkaban.FlowExtendedViewPanel({el:cloneStuff, model: model});
+ node.panel = backboneView;
+ backboneView.showExtendedView(evt);
+}
+
+var parseFlowData = function(data, model, parentPath) {
+ var nodes = {};
+ var edges = new Array();
+ for (var i=0; i < data.nodes.length; ++i) {
+ var node = data.nodes[i];
+ nodes[node.id] = node;
+ }
+
+ var nodeQueue = new Array();
+ for (var i=0; i < data.nodes.length; ++i) {
+ var node = data.nodes[i];
+ if (node.in) {
+ for (var j=0; j < node.in.length; ++j) {
+ var fromNode = nodes[node.in[j]];
+ if (!fromNode.outNodes) {
+ fromNode.outNodes = {};
+ }
+ if (!node.inNodes) {
+ node.inNodes = {};
+ }
+
+ fromNode.outNodes[node.id] = node;
+ node.inNodes[fromNode.id] = fromNode;
+ edges.push({to: node.id, from: fromNode.id});
+ }
+ }
+ else {
+ // Queue used for breath first.
+ nodeQueue.push(node);
+ }
+ }
+
+ // Iterate over the nodes again
+ var embeddedFlows = {};
+ var newParentPath = parentPath ? parentPath + ":" + data.flow : data.flow;
+
+ for (var key in nodes) {
+ var node = nodes[key];
+ node.parentPath = newParentPath;
+ if (node.type == "flow" && node.flowData) {
+ var graphModel = new azkaban.GraphModel();
+
+ node.flowData.id = node.id;
+ node.flowData.flowId = node.flowId;
+ parseFlowData(node.flowData, graphModel, newParentPath);
+ graphModel.set({id: node.id, flow: node.flowData, type: node.type, props: node.props});
+ graphModel.set({isEmbedded: true});
+ node.dataModel = graphModel;
+ }
+ }
+
+ console.log("data fetched");
+ model.set({flow: data.flow});
+ model.set({data: data});
+ model.set({nodes: nodes});
+ model.set({edges: edges});
+ model.set({disabled: {}});
+}
+
+var closeAllSubDisplays = function() {
+ $(".flowExtendedView").hide();
+}
+
+var nodeClickCallback = function(event, model, type) {
+ console.log("Node clicked callback");
+ var target = event.currentTarget;
+ var jobId = target.jobid;
+ var flowId = model.get("flow");
+ var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowId + "&job=" + jobId;
+
+ if (event.currentTarget.jobtype == "flow") {
+ var flowRequestURL = contextURL + "/manager?project=" + projectName + "&flow=" + event.currentTarget.flowId;
+ menu = [
+ {title: "View Flow...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+ {break: 1},
+ {title: "Open Flow...", callback: function() {window.location.href=flowRequestURL;}},
+ {title: "Open Flow in New Window...", callback: function() {window.open(flowRequestURL);}},
+ {break: 1},
+ {title: "Open Properties...", callback: function() {window.location.href=requestURL;}},
+ {title: "Open Properties in New Window...", callback: function() {window.open(requestURL);}},
+ {break: 1},
+ {title: "Center Flow", callback: function() {model.trigger("centerNode", jobId)}}
+ ];
+ }
+ else {
+ menu = [
+ {title: "View Job...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+ {break: 1},
+ {title: "Open Job...", callback: function() {window.location.href=requestURL;}},
+ {title: "Open Job in New Window...", callback: function() {window.open(requestURL);}},
+ {break: 1},
+ {title: "Center Job", callback: function() {model.trigger("centerNode", jobId)}}
+ ];
+ }
+ contextMenuView.show(event, menu);
+}
+
+var jobClickCallback = function(event, model) {
+ console.log("Node clicked callback");
+ var jobId = event.currentTarget.jobid;
+ var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowId + "&job=" + jobId;
+
+ var menu;
+ if (event.currentTarget.jobtype == "flow") {
+ var flowRequestURL = contextURL + "/manager?project=" + projectName + "&flow=" + event.currentTarget.flowId;
+ menu = [
+ {title: "View Flow...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+ {break: 1},
+ {title: "Open Flow...", callback: function() {window.location.href=flowRequestURL;}},
+ {title: "Open Flow in New Window...", callback: function() {window.open(flowRequestURL);}},
+ {break: 1},
+ {title: "Open Properties...", callback: function() {window.location.href=requestURL;}},
+ {title: "Open Properties in New Window...", callback: function() {window.open(requestURL);}},
+ {break: 1},
+ {title: "Center Flow", callback: function() {model.trigger("centerNode", jobId)}}
+ ];
+ }
+ else {
+ menu = [
+ {title: "View Job...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+ {break: 1},
+ {title: "Open Job...", callback: function() {window.location.href=requestURL;}},
+ {title: "Open Job in New Window...", callback: function() {window.open(requestURL);}},
+ {break: 1},
+ {title: "Center Job", callback: function() {graphModel.trigger("centerNode", jobId)}}
+ ];
+ }
+ contextMenuView.show(event, menu);
+}
+
+var edgeClickCallback = function(event, model) {
+ console.log("Edge clicked callback");
+}
+
+var graphClickCallback = function(event, model) {
+ console.log("Graph clicked callback");
+ var jobId = event.currentTarget.jobid;
+ var flowId = model.get("flowId");
+ var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowId;
+
+ var menu = [
+ {title: "Open Flow...", callback: function() {window.location.href=requestURL;}},
+ {title: "Open Flow in New Window...", callback: function() {window.open(requestURL);}},
+ {break: 1},
+ {title: "Center Graph", callback: function() {model.trigger("resetPanZoom");}}
+ ];
+
+ contextMenuView.show(event, menu);
+}
src/web/js/azkaban.svg.graph.view.js 121(+105 -16)
diff --git a/src/web/js/azkaban.svg.graph.view.js b/src/web/js/azkaban.svg.graph.view.js
index 1e0f443..1bd78de 100644
--- a/src/web/js/azkaban.svg.graph.view.js
+++ b/src/web/js/azkaban.svg.graph.view.js
@@ -94,14 +94,12 @@ azkaban.SvgGraphView = Backbone.View.extend({
var data = this.model.get("data");
var nodes = data.nodes;
- var edges = data.edges;
+ var edges = this.model.get("edges");
+
if (nodes.length == 0) {
console.log("No results");
return;
};
-
- nodes.sort();
- edges.sort();
var bounds = {};
this.nodes = {};
@@ -120,10 +118,10 @@ azkaban.SvgGraphView = Backbone.View.extend({
this.moveNodes(bounds);
for (var i = 0; i < edges.length; ++i) {
- var inNodes = this.nodes[edges[i].target].inNodes;
+ var inNodes = this.nodes[edges[i].to].inNodes;
if (!inNodes) {
inNodes = {};
- this.nodes[edges[i].target].inNodes = inNodes;
+ this.nodes[edges[i].to].inNodes = inNodes;
}
inNodes[edges[i].from] = this.nodes[edges[i].from];
@@ -132,12 +130,12 @@ azkaban.SvgGraphView = Backbone.View.extend({
outNodes = {};
this.nodes[edges[i].from].outNodes = outNodes;
}
- outNodes[edges[i].target] = this.nodes[edges[i].target];
+ outNodes[edges[i].to] = this.nodes[edges[i].to];
this.drawEdge(this, edges[i]);
}
- this.model.set({"flowId":data.flowId, "nodes": this.nodes, "edges": edges});
+ this.model.set({"flowId":data.flowId, "edges": edges});
var margin = this.graphMargin;
bounds.minX = bounds.minX ? bounds.minX - margin : -margin;
@@ -260,7 +258,7 @@ azkaban.SvgGraphView = Backbone.View.extend({
var svgns = self.svgns;
var startNode = this.nodes[edge.from];
- var endNode = this.nodes[edge.target];
+ var endNode = this.nodes[edge.to];
var startPointY = startNode.y + startNode.height/2 - 3;
var endPointY = endNode.y - endNode.height/2 + 3;
@@ -276,7 +274,7 @@ azkaban.SvgGraphView = Backbone.View.extend({
var polyLine = document.createElementNS(svgns, "polyline");
polyLine.setAttribute("class", "edge");
polyLine.setAttribute("points", pointString);
- polyLine.setAttribute("style", "fill:none;");
+ polyLine.setAttribute("style", "fill:none;stroke-width:3");
$(self.mainG).prepend(polyLine);
}
else {
@@ -286,7 +284,7 @@ azkaban.SvgGraphView = Backbone.View.extend({
line.setAttribute("y1", startPointY);
line.setAttribute("x2", endNode.x);
line.setAttribute("y2", endPointY);
-
+ line.setAttribute("style", "stroke-width:3");
$(self.mainG).prepend(line);
}
},
@@ -296,7 +294,7 @@ azkaban.SvgGraphView = Backbone.View.extend({
}
else {
this.drawBoxNode(self,node);
- //this.drawCircleNode(self,node,bounds);
+ //this.drawCircleNode2(self,node);
}
},
moveNodes: function(bounds) {
@@ -362,7 +360,8 @@ azkaban.SvgGraphView = Backbone.View.extend({
innerG.jobid = node.id;
innerG.jobtype = "flow";
innerG.flowId = node.flowId;
-
+ innerG.nodeobj = node;
+
nodeG.appendChild(innerG);
self.mainG.appendChild(nodeG);
@@ -400,6 +399,7 @@ azkaban.SvgGraphView = Backbone.View.extend({
nodeG.setAttribute("class", "node");
nodeG.jobid=node.id;
+ nodeG.jobobj=node;
},
drawBoxNode: function(self, node) {
var svg = self.svgGraph;
@@ -432,7 +432,8 @@ azkaban.SvgGraphView = Backbone.View.extend({
innerG.appendChild(rect);
innerG.appendChild(text);
innerG.jobid = node.id;
-
+ innerG.nodeobj = node;
+
nodeG.appendChild(innerG);
self.mainG.appendChild(nodeG);
@@ -462,6 +463,7 @@ azkaban.SvgGraphView = Backbone.View.extend({
nodeG.setAttribute("class", "node");
nodeG.jobid=node.id;
+ nodeG.j=node;
},
drawCircleNode: function(self, node, bounds) {
var svg = self.svgGraph;
@@ -470,6 +472,7 @@ azkaban.SvgGraphView = Backbone.View.extend({
var xOffset = 10;
var yOffset = 10;
+ var height = 18;
var nodeG = document.createElementNS(svgns, "g");
nodeG.setAttribute("class", "jobnode");
nodeG.setAttribute("font-family", "helvetica");
@@ -484,6 +487,7 @@ azkaban.SvgGraphView = Backbone.View.extend({
circle.setAttribute("cx", 10);
circle.setAttribute("r", 12);
circle.setAttribute("style", "width:inherit;stroke-opacity:1");
+
//circle.setAttribute("class", "border");
//circle.setAttribute("class", "nodecontainer");
@@ -492,8 +496,8 @@ azkaban.SvgGraphView = Backbone.View.extend({
text.appendChild(textLabel);
text.setAttribute("x", 0);
text.setAttribute("y", 0);
-
- this.addBounds(bounds, {minX:node.x - xOffset, minY: node.y - yOffset, maxX: node.x + xOffset, maxY: node.y + yOffset});
+
+ //this.addBounds(bounds, {minX:node.x - xOffset, minY: node.y - yOffset, maxX: node.x + xOffset, maxY: node.y + yOffset});
var backRect = document.createElementNS(svgns, 'rect');
backRect.setAttribute("x", 0);
@@ -510,6 +514,9 @@ azkaban.SvgGraphView = Backbone.View.extend({
nodeG.appendChild(innerG);
self.mainG.appendChild(nodeG);
+
+ var horizontalMargin = 8;
+ var verticalMargin = 2;
// Need to get text width after attaching to SVG.
var computeText = text.getComputedTextLength();
var halfWidth = computeText/2;
@@ -517,6 +524,88 @@ azkaban.SvgGraphView = Backbone.View.extend({
backRect.setAttribute("x", -halfWidth);
backRect.setAttribute("width", computeText + 20);
+ // Margin for surrounding box.
+ var boxWidth = computeText + horizontalMargin * 2;
+ var boxHeight = height + verticalMargin * 2;
+
+ node.width = boxWidth;
+ node.height = boxHeight;
+ node.centerX = boxWidth/2;
+ node.centerY = boxHeight/2;
+
+ nodeG.setAttribute("class", "node");
+ nodeG.nodeobj=node;
+ nodeG.jobid=node.id;
+ },
+ drawCircleNode2: function(self, node, bounds) {
+ var svg = self.svgGraph;
+ var svgns = self.svgns;
+
+ var xOffset = 10;
+ var yOffset = 10;
+
+ var height = 18;
+ var nodeG = document.createElementNS(svgns, "g");
+ nodeG.setAttribute("class", "jobnode");
+ nodeG.setAttribute("font-family", "helvetica");
+ nodeG.setAttribute("transform", "translate(" + node.x + "," + node.y + ")");
+ this.gNodes[node.id] = nodeG;
+
+ var innerG = document.createElementNS(svgns, "g");
+ innerG.setAttribute("transform", "translate(-10,-10)");
+
+ var circle = document.createElementNS(svgns, 'circle');
+ circle.setAttribute("cy", 10);
+ circle.setAttribute("cx", 10);
+ circle.setAttribute("r", 12);
+ circle.setAttribute("style", "width:inherit;stroke-opacity:1;stroke:rgb(100,100,100);stroke-width:5");
+ //circle.setAttribute("class", "border");
+ //circle.setAttribute("class", "nodecontainer");
+
+ var text = document.createElementNS(svgns, 'text');
+ var textLabel = document.createTextNode(node.label);
+ text.appendChild(textLabel);
+ text.setAttribute("x", 0);
+ text.setAttribute("y", 0);
+
+ //this.addBounds(bounds, {minX:node.x - xOffset, minY: node.y - yOffset, maxX: node.x + xOffset, maxY: node.y + yOffset});
+
+ var backRect = document.createElementNS(svgns, 'rect');
+ backRect.setAttribute("x", 0);
+ backRect.setAttribute("y", 2);
+ backRect.setAttribute("class", "backboard");
+ backRect.setAttribute("width", 10);
+ backRect.setAttribute("height", 15);
+
+ innerG.appendChild(circle);
+ innerG.appendChild(backRect);
+// innerG.appendChild(text);
+ innerG.jobid = node.id;
+
+ nodeG.appendChild(innerG);
+ self.mainG.appendChild(nodeG);
+
+
+ var horizontalMargin = 8;
+ var verticalMargin = 2;
+ // Need to get text width after attaching to SVG.
+ var computeText = 150;
+ var halfWidth = computeText/2;
+ text.setAttribute("x", -halfWidth + 10);
+ backRect.setAttribute("x", -halfWidth);
+ backRect.setAttribute("width", computeText + 20);
+
+ // Margin for surrounding box.
+ var boxWidth = computeText + horizontalMargin * 2;
+ var boxHeight = height + verticalMargin * 2;
+
+// innerG.removeChild(text);
+
+ node.width = boxWidth;
+ node.height = boxHeight;
+ node.centerX = boxWidth/2;
+ node.centerY = boxHeight/2;
+
nodeG.setAttribute("class", "node");
nodeG.jobid=node.id;
},
unit/build.xml 13(+12 -1)
diff --git a/unit/build.xml b/unit/build.xml
index 64d1314..c9f3f20 100644
--- a/unit/build.xml
+++ b/unit/build.xml
@@ -97,5 +97,16 @@
<zipfileset dir="${base.dir}/unit/executions/embedded" />
</zip>
</target>
-
+
+ <target name="package-embedded3" depends="jars" description="Creates a test zip">
+ <delete dir="${dist.packages.dir}" />
+ <mkdir dir="${dist.packages.dir}" />
+
+ <!-- Tarball it -->
+ <zip destfile="${dist.packages.dir}/embedded.zip">
+ <zipfileset dir="${dist.jar.dir}" />
+ <zipfileset dir="${base.dir}/unit/executions/embedded3" />
+ </zip>
+
+ </target>
</project>
diff --git a/unit/executions/embedded3/innerFlow.job b/unit/executions/embedded3/innerFlow.job
new file mode 100644
index 0000000..e9b3b89
--- /dev/null
+++ b/unit/executions/embedded3/innerFlow.job
@@ -0,0 +1,4 @@
+type=javaprocess
+seconds=1
+fail=false
+dependencies=innerJobB,innerJobC
\ No newline at end of file
diff --git a/unit/executions/embedded3/innerFlow2.job b/unit/executions/embedded3/innerFlow2.job
new file mode 100644
index 0000000..2346982
--- /dev/null
+++ b/unit/executions/embedded3/innerFlow2.job
@@ -0,0 +1,4 @@
+type=javaprocess
+seconds=1
+fail=false
+dependencies=innerJobA
\ No newline at end of file
diff --git a/unit/executions/embedded3/innerJobA.job b/unit/executions/embedded3/innerJobA.job
new file mode 100644
index 0000000..665b38d
--- /dev/null
+++ b/unit/executions/embedded3/innerJobA.job
@@ -0,0 +1,4 @@
+type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
diff --git a/unit/executions/embedded3/innerJobB.job b/unit/executions/embedded3/innerJobB.job
new file mode 100644
index 0000000..24a2e04
--- /dev/null
+++ b/unit/executions/embedded3/innerJobB.job
@@ -0,0 +1,3 @@
+type=flow
+flow.name=innerFlow2
+dependencies=innerJobA
diff --git a/unit/executions/embedded3/innerJobC.job b/unit/executions/embedded3/innerJobC.job
new file mode 100644
index 0000000..178bbef
--- /dev/null
+++ b/unit/executions/embedded3/innerJobC.job
@@ -0,0 +1,5 @@
+type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
+dependencies=innerJobA
unit/executions/embedded3/joba.job 4(+4 -0)
diff --git a/unit/executions/embedded3/joba.job b/unit/executions/embedded3/joba.job
new file mode 100644
index 0000000..665b38d
--- /dev/null
+++ b/unit/executions/embedded3/joba.job
@@ -0,0 +1,4 @@
+type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
unit/executions/embedded3/jobb.job 3(+3 -0)
diff --git a/unit/executions/embedded3/jobb.job b/unit/executions/embedded3/jobb.job
new file mode 100644
index 0000000..8844d8c
--- /dev/null
+++ b/unit/executions/embedded3/jobb.job
@@ -0,0 +1,3 @@
+type=flow
+flow.name=innerFlow
+dependencies=joba
unit/executions/embedded3/jobc.job 3(+3 -0)
diff --git a/unit/executions/embedded3/jobc.job b/unit/executions/embedded3/jobc.job
new file mode 100644
index 0000000..8844d8c
--- /dev/null
+++ b/unit/executions/embedded3/jobc.job
@@ -0,0 +1,3 @@
+type=flow
+flow.name=innerFlow
+dependencies=joba
unit/executions/embedded3/jobd.job 3(+3 -0)
diff --git a/unit/executions/embedded3/jobd.job b/unit/executions/embedded3/jobd.job
new file mode 100644
index 0000000..8844d8c
--- /dev/null
+++ b/unit/executions/embedded3/jobd.job
@@ -0,0 +1,3 @@
+type=flow
+flow.name=innerFlow
+dependencies=joba
unit/executions/embedded3/jobe.job 5(+5 -0)
diff --git a/unit/executions/embedded3/jobe.job b/unit/executions/embedded3/jobe.job
new file mode 100644
index 0000000..fe986d5
--- /dev/null
+++ b/unit/executions/embedded3/jobe.job
@@ -0,0 +1,5 @@
+type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
+dependencies=jobb,jobc,jobd
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
index 9d755df..9df97ac 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
@@ -582,16 +582,16 @@ public class FlowRunnerPipelineTest {
FileUtils.copyDirectory(directory, workingDir);
}
- private void printCurrentState(String prefix, ExecutableFlowBase flow) {
- for(ExecutableNode node: flow.getExecutableNodes()) {
-
- System.err.println(prefix + node.getNestedId() + "->" + node.getStatus().name());
- if (node instanceof ExecutableFlowBase) {
- printCurrentState(prefix, (ExecutableFlowBase)node);
- }
- }
- }
-
+// private void printCurrentState(String prefix, ExecutableFlowBase flow) {
+// for(ExecutableNode node: flow.getExecutableNodes()) {
+//
+// System.err.println(prefix + node.getNestedId() + "->" + node.getStatus().name());
+// if (node instanceof ExecutableFlowBase) {
+// printCurrentState(prefix, (ExecutableFlowBase)node);
+// }
+// }
+// }
+//
private FlowRunner createFlowRunner(EventCollectorListener eventCollector, String flowName, String groupName) throws Exception {
return createFlowRunner(eventCollector, flowName, groupName, new ExecutionOptions());
}
diff --git a/unit/java/azkaban/test/executor/ExecutableFlowTest.java b/unit/java/azkaban/test/executor/ExecutableFlowTest.java
index 35fa39d..9031518 100644
--- a/unit/java/azkaban/test/executor/ExecutableFlowTest.java
+++ b/unit/java/azkaban/test/executor/ExecutableFlowTest.java
@@ -277,8 +277,6 @@ public class ExecutableFlowTest {
Assert.assertEquals(a.getOutNodes(), a.getOutNodes());
}
-
-
public static void testEquals(ExecutionOptions optionsA, ExecutionOptions optionsB) {
Assert.assertEquals(optionsA.getConcurrentOption(), optionsB.getConcurrentOption());
Assert.assertEquals(optionsA.getNotifyOnFirstFailure(), optionsB.getNotifyOnFirstFailure());