azkaban-aplcache
Changes
src/java/azkaban/flow/Edge.java 122(+80 -42)
src/java/azkaban/flow/Flow.java 480(+232 -248)
src/java/azkaban/flow/FlowProps.java 58(+58 -0)
src/java/azkaban/flow/LayeredFlowLayout.java 48(+48 -0)
src/java/azkaban/flow/Node.java 189(+145 -44)
src/java/azkaban/project/FileProjectManager.java 485(+243 -242)
src/java/azkaban/utils/DirectoryFlowLoader.java 190(+110 -80)
src/java/azkaban/utils/Props.java 2(+1 -1)
src/java/azkaban/webapp/servlet/ProjectManagerServlet.java 331(+181 -150)
Details
src/java/azkaban/flow/Edge.java 122(+80 -42)
diff --git a/src/java/azkaban/flow/Edge.java b/src/java/azkaban/flow/Edge.java
index 82f5b25..2c69d82 100644
--- a/src/java/azkaban/flow/Edge.java
+++ b/src/java/azkaban/flow/Edge.java
@@ -1,51 +1,89 @@
package azkaban.flow;
+import java.util.HashMap;
+
public class Edge {
- public enum State {
- FAILED, SUCCEEDED, WAITING, CYCLE
- }
-
- private final Node source;
- private final Node target;
-
- private State state = State.WAITING;
-
- public Edge(Node from, Node to) {
- this.source = from;
- this.target = to;
- }
-
- public Edge(Edge clone) {
- this.source = clone.source;
- this.target = clone.target;
- this.state = clone.state;
- }
-
- public String getId() {
- return getSourceId() + ">>" + getTargetId();
- }
-
- public State getState() {
- return state;
- }
-
- public void setState(State state) {
- this.state = state;
- }
-
- public Node getSource() {
- return source;
- }
-
- public Node getTarget() {
- return target;
- }
-
+ private final String sourceId;
+ private final String targetId;
+ private Node source;
+ private Node target;
+ private String error;
+
+
+ public Edge(String fromId, String toId) {
+ this.sourceId = fromId;
+ this.targetId = toId;
+ }
+
+ public Edge(Edge clone) {
+ this.sourceId = clone.sourceId;
+ this.targetId = clone.targetId;
+ this.error = clone.error;
+ }
+
+ public String getId() {
+ return getSourceId() + ">>" + getTargetId();
+ }
+
public String getSourceId() {
- return source == null? null : source.getId();
+ return sourceId;
}
public String getTargetId() {
- return target == null? null : target.getId();
+ return targetId;
+ }
+
+ public void setError(String error) {
+ this.error = error;
+ }
+
+ public String getError() {
+ return this.error;
+ }
+
+ public boolean hasError() {
+ return this.error != null;
}
+
+ public Node getSource() {
+ return source;
+ }
+
+ public void setSource(Node source) {
+ this.source = source;
+ }
+
+ public Node getTarget() {
+ return target;
+ }
+
+ public void setTarget(Node target) {
+ this.target = target;
+ }
+
+ public Object toObject() {
+ HashMap<String, Object> obj = new HashMap<String, Object>();
+ obj.put("source", getSourceId());
+ obj.put("target", getTargetId());
+ if (error != null) {
+ obj.put("error", error);
+ }
+
+ return obj;
+ }
+
+ public static Edge fromObject(Object obj) {
+ HashMap<String, Object> edgeObj = (HashMap<String,Object>)obj;
+
+ String source = (String)edgeObj.get("source");
+ String target = (String)edgeObj.get("target");
+
+ String error = (String)edgeObj.get("error");
+
+ Edge edge = new Edge(source, target);
+ edge.setError(error);
+
+ return edge;
+ }
+
}
src/java/azkaban/flow/Flow.java 480(+232 -248)
diff --git a/src/java/azkaban/flow/Flow.java b/src/java/azkaban/flow/Flow.java
index b0ba280..f85b2d6 100644
--- a/src/java/azkaban/flow/Flow.java
+++ b/src/java/azkaban/flow/Flow.java
@@ -13,108 +13,153 @@ import azkaban.project.ResourceLoader;
import azkaban.utils.Props;
public class Flow {
- public enum State {
- READY, RUNNING, RUNNING_WITH_FAILURE, FAILED, SUCCEEDED
- }
- private final String id;
- private ArrayList<Node> baseNodes = new ArrayList<Node>();
- private HashMap<String, Node> nodes = new HashMap<String, Node>();
- private HashMap<String, Edge> edges = new HashMap<String, Edge>();
- private HashMap<String, Set<Edge>> sourceEdges = new HashMap<String, Set<Edge>>();
- private HashMap<String, Set<Edge>> targetEdges = new HashMap<String, Set<Edge>>();
- private HashMap<String, Props> flowProps = new HashMap<String, Props>();
- private ArrayList<String> errors;
+ public enum State {
+ READY, RUNNING, RUNNING_WITH_FAILURE, FAILED, SUCCEEDED
+ }
+ private final String id;
+ private ArrayList<Node> startNodes;
+ private ArrayList<Node> endNodes;
+
+ private HashMap<String, Node> nodes = new HashMap<String, Node>();
+
+ private HashMap<String, Edge> edges = new HashMap<String, Edge>();
+ private HashMap<String, Set<Edge>> outEdges = new HashMap<String, Set<Edge>>();
+ private HashMap<String, Set<Edge>> inEdges = new HashMap<String, Set<Edge>>();
+ private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
+
+ private ArrayList<String> errors;
+
+ private boolean isLayedOut = false;
+
+ public Flow(String id) {
+ this.id = id;
+ }
+
+ public void initialize() {
+ if (startNodes == null) {
+ startNodes = new ArrayList<Node>();
+ endNodes = new ArrayList<Node>();
+ for (Node node : nodes.values()) {
+ // If it doesn't have any incoming edges, its a start node
+ if (!inEdges.containsKey(node.getId())) {
+ startNodes.add(node);
+ }
- public Flow(String id) {
- this.id = id;
- }
+ // If it doesn't contain any outgoing edges, its an end node.
+ if (!outEdges.containsKey(node.getId())) {
+ endNodes.add(node);
+ }
+ }
+
+ for (Node node: startNodes) {
+ node.setLevel(0);
+ recursiveSetLevels(node);
+ }
+ }
+ }
- public void addBaseNode(Node node) {
- this.baseNodes.add(node);
- }
-
- public void addAllNodes(Collection<Node> nodes) {
- for (Node node: nodes) {
- addNode(node);
- }
- }
-
- public void addNode(Node node) {
- nodes.put(node.getId(), node);
- }
+ private void recursiveSetLevels(Node node) {
+ Set<Edge> edges = outEdges.get(node.getId());
+ if (edges != null) {
+ for (Edge edge : edges) {
+ Node nextNode = nodes.get(edge.getTargetId());
+ edge.setSource(node);
+ edge.setTarget(nextNode);
+
+ // We pick whichever is higher to get the max distance from root.
+ int level = Math.max(node.getLevel() + 1, nextNode.getLevel());
+ nextNode.setLevel(level);
+ recursiveSetLevels(nextNode);
+ }
+ }
+ }
- public void addProperties(Props props) {
- flowProps.put(props.getSource(), props);
- }
-
- public void addAllProperties(Collection<Props> props) {
- for (Props prop: props) {
- flowProps.put(prop.getSource(), prop);
- }
- }
-
- public String getId() {
- return id;
- }
-
- public void addError(String error) {
- if (errors == null) {
- errors = new ArrayList<String>();
- }
-
- errors.add(error);
- }
-
- public List<String> getErrors() {
- return errors;
- }
-
- public boolean hasErrors() {
- return errors != null && !errors.isEmpty();
- }
-
- public Collection<Node> getNodes() {
- return nodes.values();
- }
-
- public Collection<Edge> getEdges() {
- return edges.values();
- }
-
- public void addAllEdges(Collection<Edge> edges) {
- for (Edge edge: edges) {
- addEdge(edge);
- }
- }
-
- public void addEdge(Edge edge) {
- String source = edge.getSourceId();
- String target = edge.getTargetId();
+ public List<Node> getStartNodes() {
+ return startNodes;
+ }
+
+ public List<Node> getEndNodes() {
+ return endNodes;
+ }
+
+ public void addAllNodes(Collection<Node> nodes) {
+ for (Node node: nodes) {
+ addNode(node);
+ }
+ }
- if (edge instanceof ErrorEdge) {
- addError("Error on " + edge.getId() + ". " + ((ErrorEdge)edge).getError());
- }
+ public void addNode(Node node) {
+ nodes.put(node.getId(), node);
+ }
- Set<Edge> sourceSet = getEdgeSet(sourceEdges, source);
- sourceSet.add(edge);
-
- Set<Edge> targetSet = getEdgeSet(targetEdges, target);
- targetSet.add(edge);
-
- edges.put(edge.getId(), edge);
- }
-
- private Set<Edge> getEdgeSet(HashMap<String, Set<Edge>> map, String id) {
- Set<Edge> edges = map.get(id);
- if (edges == null) {
- edges = new HashSet<Edge>();
- map.put(id, edges);
- }
-
- return edges;
- }
-
- public Map<String,Object> toObject() {
+ public void addAllFlowProperties(Collection<FlowProps> props) {
+ for (FlowProps prop : props) {
+ flowProps.put(prop.getSource(), prop);
+ }
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void addError(String error) {
+ if (errors == null) {
+ errors = new ArrayList<String>();
+ }
+
+ errors.add(error);
+ }
+
+ public List<String> getErrors() {
+ return errors;
+ }
+
+ public boolean hasErrors() {
+ return errors != null && !errors.isEmpty();
+ }
+
+ public Collection<Node> getNodes() {
+ return nodes.values();
+ }
+
+ public Collection<Edge> getEdges() {
+ return edges.values();
+ }
+
+ public void addAllEdges(Collection<Edge> edges) {
+ for (Edge edge: edges) {
+ addEdge(edge);
+ }
+ }
+
+ public void addEdge(Edge edge) {
+ String source = edge.getSourceId();
+ String target = edge.getTargetId();
+
+ if (edge.hasError()) {
+ addError("Error on " + edge.getId() + ". " + edge.getError());
+ }
+
+ Set<Edge> sourceSet = getEdgeSet(outEdges, source);
+ sourceSet.add(edge);
+
+ Set<Edge> targetSet = getEdgeSet(inEdges, target);
+ targetSet.add(edge);
+
+ edges.put(edge.getId(), edge);
+ }
+
+ private Set<Edge> getEdgeSet(HashMap<String, Set<Edge>> map, String id) {
+ Set<Edge> edges = map.get(id);
+ if (edges == null) {
+ edges = new HashSet<Edge>();
+ map.put(id, edges);
+ }
+
+ return edges;
+ }
+
+ public Map<String,Object> toObject() {
HashMap<String, Object> flowObj = new HashMap<String, Object>();
flowObj.put("type", "flow");
flowObj.put("id", getId());
@@ -126,174 +171,113 @@ public class Flow {
}
return flowObj;
- }
-
- @SuppressWarnings("unchecked")
- public static Flow flowFromObject(Object object, ResourceLoader loader) {
- Map<String, Object> flowObject = (Map<String,Object>)object;
-
- String id = (String)flowObject.get("id");
- Flow flow = new Flow(id);
-
- // Loading projects
- List<Object> propertiesList = (List<Object>)flowObject.get("props");
- Map<String, Props> properties = loadPropertiesFromObject(propertiesList, loader);
- flow.addAllProperties(properties.values());
-
- // Loading nodes
- List<Object> nodeList = (List<Object>)flowObject.get("nodes");
- Map<String, Node> nodes = loadNodesFromObjects(nodeList, properties, loader);
- flow.addAllNodes(nodes.values());
-
- // Loading edges
- List<Object> edgeList = (List<Object>)flowObject.get("edges");
- List<Edge> edges = loadEdgeFromObjects(edgeList, nodes, loader);
- flow.addAllEdges(edges);
-
- return flow;
- }
-
- private static Map<String, Node> loadNodesFromObjects(List<Object> nodeList, Map<String, Props> properties, ResourceLoader loader) {
- Map<String, Node> nodeMap = new HashMap<String, Node>();
-
- for (Object obj: nodeList) {
- @SuppressWarnings("unchecked")
- Map<String,Object> nodeObj = (Map<String,Object>)obj;
- String id = (String)nodeObj.get("id");
- String propsSource = (String)nodeObj.get("props.source");
- String inheritedSource = (String)nodeObj.get("inherited.source");
-
- Props inheritedProps = properties.get(inheritedSource);
- Props props = loader.loadPropsFromSource(inheritedProps, propsSource);
-
- Node node = new Node(id, props);
- nodeMap.put(id, node);
- }
-
- return nodeMap;
- }
-
- private static List<Edge> loadEdgeFromObjects(List<Object> edgeList, Map<String, Node> nodes, ResourceLoader loader) {
- List<Edge> edgeResult = new ArrayList<Edge>();
-
- for (Object obj: edgeList) {
- @SuppressWarnings("unchecked")
- Map<String,Object> edgeObj = (Map<String,Object>)obj;
- String id = (String)edgeObj.get("id");
- String source = (String)edgeObj.get("source");
- String target = (String)edgeObj.get("target");
-
- Node sourceNode = nodes.get(source);
- Node targetNode = nodes.get(target);
- String error = (String)edgeObj.get("error");
-
- Edge edge = null;
- if (sourceNode == null && targetNode != null) {
- edge = new ErrorEdge(source, target, "Edge Error: Neither source " + source + " nor " + target + " could be found.");
- }
- else if (sourceNode == null && targetNode != null) {
- edge = new ErrorEdge(source, target, "Edge Error: Source " + source + " could not be found. Target: " + target);
- }
- else if (sourceNode != null && targetNode == null) {
- edge = new ErrorEdge(source, target, "Edge Error: Source found " + source + ", but " + target + " could be found.");
- }
- else if (error != null) {
- edge = new ErrorEdge(source, target, error);
- }
- else {
- edge = new Edge(sourceNode, targetNode);
- }
-
- edgeResult.add(edge);
- }
-
- return edgeResult;
- }
-
- @SuppressWarnings("unchecked")
- private static Map<String, Props> loadPropertiesFromObject(List<Object> propertyObjectList, ResourceLoader loader) {
- Map<String, Props> properties = new HashMap<String, Props>();
-
- Map<String, String> sourceToInherit = new HashMap<String,String>();
- for (Object propObj: propertyObjectList) {
- Map<String, Object> mapObj = (Map<String,Object>)propObj;
- String source = (String)mapObj.get("source");
- String inherits = (String)mapObj.get("inherits");
-
- sourceToInherit.put(source, inherits);
- }
-
- for (String source: sourceToInherit.keySet()) {
- recursiveResolveProps(source, sourceToInherit, loader, properties);
- }
-
- return properties;
- }
-
- private static void recursiveResolveProps(String source, Map<String, String> sourceToInherit, ResourceLoader loader, Map<String, Props> properties) {
- Props prop = properties.get(source);
- if (prop != null) {
- return;
- }
-
- String inherits = sourceToInherit.get(source);
- Props parent = null;
- if (inherits != null) {
- recursiveResolveProps(inherits, sourceToInherit, loader, properties);
- parent = properties.get(inherits);
- }
+ }
+
+ private List<Object> objectizeProperties() {
+ ArrayList<Object> result = new ArrayList<Object>();
+ for (FlowProps props: flowProps.values()) {
+ Object objProps = props.toObject();
+ result.add(objProps);
+ }
+
+ return result;
+ }
- prop = loader.loadPropsFromSource(parent, source);
- properties.put(source, prop);
- }
-
- private List<Map<String,Object>> objectizeNodes() {
- ArrayList<Map<String,Object>> result = new ArrayList<Map<String,Object>>();
+ private List<Object> objectizeNodes() {
+ ArrayList<Object> result = new ArrayList<Object>();
for (Node node : getNodes()) {
- HashMap<String, Object> nodeObj = new HashMap<String, Object>();
- nodeObj.put("id", node.getId());
- nodeObj.put("props.source", node.getProps().getSource());
- Props parentProps = node.getProps().getParent();
-
- if (parentProps != null) {
- nodeObj.put("inherited.source", parentProps.getSource());
- }
+ Object nodeObj = node.toObject();
result.add(nodeObj);
}
return result;
}
- private List<Map<String,Object>> objectizeEdges() {
- ArrayList<Map<String,Object>> result = new ArrayList<Map<String,Object>>();
+ private List<Object> objectizeEdges() {
+ ArrayList<Object> result = new ArrayList<Object>();
for (Edge edge: getEdges()) {
- HashMap<String, Object> edgeObj = new HashMap<String, Object>();
- edgeObj.put("id", edge.getId());
- edgeObj.put("source", edge.getSourceId());
- edgeObj.put("target", edge.getTargetId());
- if (edge instanceof ErrorEdge) {
- ErrorEdge errorEdge = (ErrorEdge)edge;
- edgeObj.put("error", errorEdge.getError());
- }
+ Object edgeObj = edge.toObject();
result.add(edgeObj);
}
return result;
}
- private List<Map<String,Object>> objectizeProperties() {
+ @SuppressWarnings("unchecked")
+ public static Flow flowFromObject(Object object) {
+ Map<String, Object> flowObject = (Map<String,Object>)object;
- ArrayList<Map<String,Object>> result = new ArrayList<Map<String,Object>>();
- for (Props props: flowProps.values()) {
- HashMap<String, Object> propObj = new HashMap<String, Object>();
- propObj.put("source", props.getSource());
- Props parent = props.getParent();
- if (parent != null) {
- propObj.put("inherits", parent.getSource());
- }
- result.add(propObj);
+ String id = (String)flowObject.get("id");
+ Flow flow = new Flow(id);
+
+ // Loading projects
+ List<Object> propertiesList = (List<Object>)flowObject.get("props");
+ Map<String, FlowProps> properties = loadPropertiesFromObject(propertiesList);
+ flow.addAllFlowProperties(properties.values());
+
+ // Loading nodes
+ List<Object> nodeList = (List<Object>)flowObject.get("nodes");
+ Map<String, Node> nodes = loadNodesFromObjects(nodeList);
+ flow.addAllNodes(nodes.values());
+
+ // Loading edges
+ List<Object> edgeList = (List<Object>)flowObject.get("edges");
+ List<Edge> edges = loadEdgeFromObjects(edgeList, nodes);
+ flow.addAllEdges(edges);
+
+ return flow;
+ }
+
+ private static Map<String, Node> loadNodesFromObjects(List<Object> nodeList) {
+ Map<String, Node> nodeMap = new HashMap<String, Node>();
+
+ for (Object obj: nodeList) {
+ Node node = Node.fromObject(obj);
+ nodeMap.put(node.getId(), node);
}
- return result;
+ return nodeMap;
+ }
+
+ private static List<Edge> loadEdgeFromObjects(List<Object> edgeList, Map<String, Node> nodes) {
+ List<Edge> edgeResult = new ArrayList<Edge>();
+
+ for (Object obj: edgeList) {
+ Edge edge = Edge.fromObject(obj);
+ edgeResult.add(edge);
+ }
+
+ return edgeResult;
+ }
+
+ private static Map<String, FlowProps> loadPropertiesFromObject(List<Object> propertyObjectList) {
+ Map<String, FlowProps> properties = new HashMap<String, FlowProps>();
+
+ for (Object propObj: propertyObjectList) {
+ FlowProps prop = FlowProps.fromObject(propObj);
+ properties.put(prop.getSource(), prop);
+ }
+
+ return properties;
+ }
+
+ public boolean isLayedOut() {
+ return isLayedOut;
+ }
+
+ public void setLayedOut(boolean layedOut) {
+ this.isLayedOut = layedOut;
+ }
+
+ /*package*/ Map<String, Node> getNodeMap() {
+ return nodes;
+ }
+
+ /*package*/ Map<String, Set<Edge>> getOutEdgeMap() {
+ return outEdges;
+ }
+
+ /*package*/ Map<String, Set<Edge>> getInEdgeMap() {
+ return inEdges;
}
}
\ No newline at end of file
diff --git a/src/java/azkaban/flow/FlowLayout.java b/src/java/azkaban/flow/FlowLayout.java
new file mode 100644
index 0000000..de5c7b3
--- /dev/null
+++ b/src/java/azkaban/flow/FlowLayout.java
@@ -0,0 +1,5 @@
+package azkaban.flow;
+
+public interface FlowLayout {
+ public void layoutFlow(Flow flow);
+}
src/java/azkaban/flow/FlowProps.java 58(+58 -0)
diff --git a/src/java/azkaban/flow/FlowProps.java b/src/java/azkaban/flow/FlowProps.java
new file mode 100644
index 0000000..77c7183
--- /dev/null
+++ b/src/java/azkaban/flow/FlowProps.java
@@ -0,0 +1,58 @@
+package azkaban.flow;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import azkaban.utils.Props;
+
+public class FlowProps {
+ private String parentSource;
+ private String propSource;
+ private Props props = null;
+
+ public FlowProps(String parentSource, String propSource) {
+ this.parentSource = parentSource;
+ this.propSource = propSource;
+ }
+
+ public FlowProps(Props props) {
+ this.setProps(props);
+ }
+
+ public Props getProps() {
+ return props;
+ }
+
+ public void setProps(Props props) {
+ this.props = props;
+ this.parentSource = props.getParent() == null ? null : props.getParent().getSource();
+ this.propSource = props.getSource();
+ }
+
+ public String getSource() {
+ return propSource;
+ }
+
+ public String getInheritedSource() {
+ return parentSource;
+ }
+
+ public Object toObject() {
+ HashMap<String, Object> obj = new HashMap<String, Object>();
+ obj.put("source", propSource);
+ if (parentSource != null) {
+ obj.put("inherits", parentSource);
+ }
+ return obj;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static FlowProps fromObject(Object obj) {
+ Map<String, Object> flowMap = (Map<String, Object>)obj;
+ String source = (String)flowMap.get("source");
+ String parentSource = (String)flowMap.get("inherits");
+
+ FlowProps flowProps = new FlowProps(parentSource, source);
+ return flowProps;
+ }
+}
src/java/azkaban/flow/LayeredFlowLayout.java 48(+48 -0)
diff --git a/src/java/azkaban/flow/LayeredFlowLayout.java b/src/java/azkaban/flow/LayeredFlowLayout.java
new file mode 100644
index 0000000..33b10b3
--- /dev/null
+++ b/src/java/azkaban/flow/LayeredFlowLayout.java
@@ -0,0 +1,48 @@
+package azkaban.flow;
+
+import java.awt.geom.Point2D;
+import java.util.ArrayList;
+import java.util.Map;
+
+public class LayeredFlowLayout implements FlowLayout{
+ @Override
+ public void layoutFlow(Flow flow) {
+ analyzeFlow(flow);
+ flow.setLayedOut(true);
+ }
+
+ private void analyzeFlow(Flow flow) {
+ Map<String, Node> node = flow.getNodeMap();
+
+ }
+
+ private class WrappedNode extends LayeredNode {
+ private Node node;
+ public WrappedNode(Node node) {
+ this.node = node;
+ }
+ public Node getNode() {
+ return node;
+ }
+ }
+
+ private class LayeredNode {
+ private Point2D point;
+ private int level;
+ private ArrayList<LayeredNode> inNodes;
+ private ArrayList<LayeredNode> outNodes;
+
+ public int getLevel() {
+ return level;
+ }
+ public void setLevel(int level) {
+ this.level = level;
+ }
+ public Point2D getPoint() {
+ return point;
+ }
+ public void setPoint(Point2D point) {
+ this.point = point;
+ }
+ }
+}
src/java/azkaban/flow/Node.java 189(+145 -44)
diff --git a/src/java/azkaban/flow/Node.java b/src/java/azkaban/flow/Node.java
index ce2fae7..9a08879 100644
--- a/src/java/azkaban/flow/Node.java
+++ b/src/java/azkaban/flow/Node.java
@@ -1,50 +1,151 @@
package azkaban.flow;
+import java.awt.geom.Point2D;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.net.ssl.SSLEngineResult.Status;
+
import azkaban.utils.Props;
public class Node {
- public enum State {
- FAILED, SUCCEEDED, RUNNING, WAITING, IGNORED
- }
-
- private final String id;
-
- private State state = State.WAITING;
-
- private Props props;
-
- public Node(String id, Props props) {
- this.id = id;
- this.props = props;
- }
-
- /**
- * Clones nodes
- * @param node
- */
- public Node(Node clone) {
- this.id = clone.id;
- this.props = clone.props;
- this.state = clone.state;
- }
-
- public String getId() {
- return id;
- }
-
- public State getState() {
- return state;
- }
-
- public void setState(State state) {
- this.state = state;
- }
-
- public Props getProps() {
- return props;
- }
-
- public void setProps(Props props) {
- this.props = props;
- }
+ public enum State {
+ FAILED, SUCCEEDED, RUNNING, WAITING, IGNORED
+ }
+
+ private final String id;
+ private String jobSource;
+ private String propsSource;
+ private State state = State.WAITING;
+
+ private Point2D.Double position = null;
+ private int level;
+ private int expectedRunTimeSec = 1;
+
+ public Node(String id) {
+ this.id = id;
+ }
+
+ /**
+ * Clones nodes
+ * @param node
+ */
+ public Node(Node clone) {
+ this.id = clone.id;
+ this.propsSource = clone.propsSource;
+ this.jobSource = clone.jobSource;
+ this.state = clone.state;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public State getState() {
+ return state;
+ }
+
+ public void setState(State state) {
+ this.state = state;
+ }
+
+ public Point2D.Double getPosition() {
+ return position;
+ }
+
+ public void setPosition(Point2D.Double position) {
+ this.position = position;
+ }
+
+ public int getLevel() {
+ return level;
+ }
+
+ public void setLevel(int level) {
+ this.level = level;
+ }
+
+ public String getJobSource() {
+ return jobSource;
+ }
+
+ public void setJobSource(String jobSource) {
+ this.jobSource = jobSource;
+ }
+
+ public String getPropsSource() {
+ return propsSource;
+ }
+
+ public void setPropsSource(String propsSource) {
+ this.propsSource = propsSource;
+ }
+
+ public void setExpectedRuntimeSec(int runtimeSec) {
+ expectedRunTimeSec = runtimeSec;
+ }
+
+ public int getExpectedRuntimeSec() {
+ return expectedRunTimeSec;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Node fromObject(Object obj) {
+ Map<String,Object> mapObj = (Map<String,Object>)obj;
+ String id = (String)mapObj.get("id");
+
+ Node node = new Node(id);
+ String jobSource = (String)mapObj.get("job.source");
+ String propSource = (String)mapObj.get("prop.source");
+ node.setJobSource(jobSource);
+ node.setPropsSource(propSource);
+
+ Integer expectedRuntime = (Integer)mapObj.get("expectedRuntime");
+ if (expectedRuntime != null) {
+ node.setExpectedRuntimeSec(expectedRuntime);
+ }
+
+ String stateStr = (String)mapObj.get("status");
+ if (stateStr != null) {
+ State state = State.valueOf(stateStr);
+ if (state != null) {
+ node.setState(state);
+ }
+ }
+
+ Map<String,Object> layoutInfo = (Map<String,Object>)mapObj.get("layout");
+ if (layoutInfo != null) {
+ Double x = (Double)layoutInfo.get("x");
+ Double y = (Double)layoutInfo.get("y");
+ Integer level = (Integer)layoutInfo.get("level");
+
+ if (x != null && y != null) {
+ node.setPosition(new Point2D.Double(x, y));
+ }
+ if (level != null) {
+ node.setLevel(level);
+ }
+ }
+
+ return node;
+ }
+
+ public Object toObject() {
+ HashMap<String, Object> objMap = new HashMap<String, Object>();
+ objMap.put("id", id);
+ objMap.put("job.source", jobSource);
+ objMap.put("prop.source", propsSource);
+ objMap.put("expectedRuntime", expectedRunTimeSec);
+ objMap.put("state", state.toString());
+
+ if (position != null) {
+ HashMap<String, Object> layoutInfo = new HashMap<String, Object>();
+ layoutInfo.put("x", position.x);
+ layoutInfo.put("y", position.y);
+ layoutInfo.put("level", level);
+ objMap.put("layout", layoutInfo);
+ }
+
+ return objMap;
+ }
}
\ No newline at end of file
src/java/azkaban/project/FileProjectManager.java 485(+243 -242)
diff --git a/src/java/azkaban/project/FileProjectManager.java b/src/java/azkaban/project/FileProjectManager.java
index 14e3c50..041d90f 100644
--- a/src/java/azkaban/project/FileProjectManager.java
+++ b/src/java/azkaban/project/FileProjectManager.java
@@ -6,7 +6,6 @@ import java.io.FileWriter;
import java.io.IOException;
import java.security.AccessControlException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -20,7 +19,7 @@ import azkaban.flow.Flow;
import azkaban.user.Permission;
import azkaban.user.Permission.Type;
import azkaban.user.User;
-import azkaban.utils.FlowUtils;
+import azkaban.utils.DirectoryFlowLoader;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
@@ -35,180 +34,183 @@ public class FileProjectManager implements ProjectManager {
private static final String PROPERTIES_FILENAME = "project.json";
private static final String PROJECT_DIRECTORY = "src";
private static final String FLOW_EXTENSION = ".flow";
- private static final Logger logger = Logger.getLogger(FileProjectManager.class);
- private ConcurrentHashMap<String, Project> projects = new ConcurrentHashMap<String, Project>();
-
+ private static final Logger logger = Logger.getLogger(FileProjectManager.class);
+ private ConcurrentHashMap<String, Project> projects = new ConcurrentHashMap<String, Project>();
+
private File projectDirectory;
-
- public FileProjectManager(Props props) {
- setupDirectories(props);
- loadAllProjects();
- }
- private void setupDirectories(Props props) {
- String projectDir = props.getString(DIRECTORY_PARAM);
+ public FileProjectManager(Props props) {
+ setupDirectories(props);
+ loadAllProjects();
+ }
+
+ private void setupDirectories(Props props) {
+ String projectDir = props.getString(DIRECTORY_PARAM);
logger.info("Using directory " + projectDir + " as the project directory.");
- projectDirectory = new File(projectDir);
- if (!projectDirectory.exists()) {
- logger.info("Directory " + projectDir + " doesn't exist. Creating.");
- if (projectDirectory.mkdirs()) {
- logger.info("Directory creation was successful.");
- }
- else {
- throw new RuntimeException("FileProjectLoader cannot create directory " + projectDirectory);
- }
- }
- else if (projectDirectory.isFile()) {
+ projectDirectory = new File(projectDir);
+ if (!projectDirectory.exists()) {
+ logger.info("Directory " + projectDir + " doesn't exist. Creating.");
+ if (projectDirectory.mkdirs()) {
+ logger.info("Directory creation was successful.");
+ }
+ else {
+ throw new RuntimeException("FileProjectLoader cannot create directory " + projectDirectory);
+ }
+ }
+ else if (projectDirectory.isFile()) {
throw new RuntimeException("FileProjectManager directory " + projectDirectory + " is really a file.");
- }
- }
-
- private void loadAllProjects() {
- File[] directories = projectDirectory.listFiles();
-
- for (File dir: directories) {
- if (!dir.isDirectory()) {
- logger.error("ERROR loading project from " + dir.getPath() + ". Not a directory." );
- }
- else {
- File propertiesFile = new File(dir, PROPERTIES_FILENAME);
- if (!propertiesFile.exists()) {
- logger.error("ERROR loading project from " + dir.getPath() + ". Project file " + PROPERTIES_FILENAME + " not found." );
- }
- else {
- Object obj = null;
- try {
- obj = JSONUtils.parseJSONFromFile(propertiesFile);
+ }
+ }
+
+ private void loadAllProjects() {
+ File[] directories = projectDirectory.listFiles();
+
+ for (File dir: directories) {
+ if (!dir.isDirectory()) {
+ logger.error("ERROR loading project from " + dir.getPath() + ". Not a directory." );
+ }
+ else {
+ File propertiesFile = new File(dir, PROPERTIES_FILENAME);
+ if (!propertiesFile.exists()) {
+ logger.error("ERROR loading project from " + dir.getPath() + ". Project file " + PROPERTIES_FILENAME + " not found." );
+ }
+ else {
+ Object obj = null;
+ try {
+ obj = JSONUtils.parseJSONFromFile(propertiesFile);
} catch (IOException e) {
logger.error("ERROR loading project from " + dir.getPath() + ". Project file " + PROPERTIES_FILENAME + " couldn't be read.", e );
continue;
}
-
- Project project = Project.projectFromObject(obj);
- logger.info("Loading project " + project.getName());
- projects.put(project.getName(), project);
-
- String source = project.getSource();
- if (source == null) {
- logger.info(project.getName() + ": No flows uploaded");
- continue;
- }
-
- File projectDir = new File(dir, source);
- if (!projectDir.exists()) {
- logger.error("ERROR project source dir " + projectDir + " doesn't exist.");
- }
- else if (!projectDir.isDirectory()) {
- logger.error("ERROR project source dir " + projectDir + " is not a directory.");
- }
- else {
- File projectSourceDir = new File(projectDir, PROJECT_DIRECTORY);
- FileResourceLoader loader = new FileResourceLoader(projectSourceDir);
- File[] flowFiles = projectDir.listFiles(new SuffixFilter(FLOW_EXTENSION));
- Map<String, Flow> flowMap = new LinkedHashMap<String, Flow>();
- for (File flowFile: flowFiles) {
+
+ Project project = Project.projectFromObject(obj);
+ logger.info("Loading project " + project.getName());
+ projects.put(project.getName(), project);
+
+ String source = project.getSource();
+ if (source == null) {
+ logger.info(project.getName() + ": No flows uploaded");
+ continue;
+ }
+
+ File projectDir = new File(dir, source);
+ if (!projectDir.exists()) {
+ logger.error("ERROR project source dir " + projectDir + " doesn't exist.");
+ }
+ else if (!projectDir.isDirectory()) {
+ logger.error("ERROR project source dir " + projectDir + " is not a directory.");
+ }
+ else {
+ File[] flowFiles = projectDir.listFiles(new SuffixFilter(FLOW_EXTENSION));
+ Map<String, Flow> flowMap = new LinkedHashMap<String, Flow>();
+ for (File flowFile: flowFiles) {
Object objectizedFlow = null;
- try {
- objectizedFlow = JSONUtils.parseJSONFromFile(flowFile);
+ try {
+ objectizedFlow = JSONUtils.parseJSONFromFile(flowFile);
} catch (IOException e) {
logger.error("Error parsing flow file " + flowFile.toString());
}
-
- //Recreate Flow
- Flow flow = Flow.flowFromObject(objectizedFlow, loader);
- logger.debug("Loaded flow " + project.getName() + ": " + flow.getId());
- flowMap.put(flow.getId(), flow);
- }
-
- synchronized (project) {
- project.setFlows(flowMap);
- }
- }
- }
- }
- }
- }
-
- public List<String> getProjectNames() {
- return new ArrayList<String>(projects.keySet());
- }
-
- public List<Project> getProjects(User user) {
- ArrayList<Project> array = new ArrayList<Project>();
- for(Project project : projects.values()) {
- Permission perm = project.getUserPermission(user);
- if (perm.isPermissionSet(Type.ADMIN) || perm.isPermissionSet(Type.READ)) {
- array.add(project);
- }
- }
- return array;
- }
-
- public Project getProject(String name, User user) {
- Project project = projects.get(name);
- if (project != null) {
- Permission perm = project.getUserPermission(user);
- if (perm.isPermissionSet(Type.ADMIN) || perm.isPermissionSet(Type.READ)) {
- return project;
- }
- else {
- throw new AccessControlException("Permission denied. Do not have read access.");
- }
- }
- return null;
- }
-
- public void uploadProject(String projectName, File dir, User uploader, boolean force) throws ProjectManagerException {
+
+ //Recreate Flow
+ Flow flow = Flow.flowFromObject(objectizedFlow);
+ logger.debug("Loaded flow " + project.getName() + ": " + flow.getId());
+ flow.initialize();
+ flowMap.put(flow.getId(), flow);
+ }
+
+ synchronized (project) {
+ project.setFlows(flowMap);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public List<String> getProjectNames() {
+ return new ArrayList<String>(projects.keySet());
+ }
+
+ public List<Project> getProjects(User user) {
+ ArrayList<Project> array = new ArrayList<Project>();
+ for(Project project : projects.values()) {
+ Permission perm = project.getUserPermission(user);
+ if (perm.isPermissionSet(Type.ADMIN) || perm.isPermissionSet(Type.READ)) {
+ array.add(project);
+ }
+ }
+ return array;
+ }
+
+ public Project getProject(String name, User user) {
+ Project project = projects.get(name);
+ if (project != null) {
+ Permission perm = project.getUserPermission(user);
+ if (perm.isPermissionSet(Type.ADMIN) || perm.isPermissionSet(Type.READ)) {
+ return project;
+ }
+ else {
+ throw new AccessControlException("Permission denied. Do not have read access.");
+ }
+ }
+ return project;
+ }
+
+ public void uploadProject(String projectName, File dir, User uploader, boolean force) throws ProjectManagerException {
logger.info("Uploading files to " + projectName);
- Project project = projects.get(projectName);
-
- if (project == null) {
- throw new ProjectManagerException("Project not found.");
- }
+ Project project = projects.get(projectName);
+
+ if (project == null) {
+ throw new ProjectManagerException("Project not found.");
+ }
if (!project.hasPermission(uploader, Type.WRITE)) {
throw new AccessControlException("Permission denied. Do not have write access.");
}
-
- Map<String, Flow> flows = new HashMap<String,Flow>();
+
List<String> errors = new ArrayList<String>();
- FlowUtils.loadProjectFlows(dir, flows, errors);
-
- File projectPath = new File(projectDirectory, projectName);
+ DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+ loader.loadProjectFlow(dir);
+ Map<String, Flow> flows = loader.getFlowMap();
+
+ File projectPath = new File(projectDirectory, projectName);
File installDir = new File(projectPath, FILE_DATE_FORMAT.print(System.currentTimeMillis()));
if (!installDir.mkdir()) {
throw new ProjectManagerException("Cannot create directory in " + projectDirectory);
- }
+ }
for (Flow flow: flows.values()) {
- try {
+ try {
+ if (flow.getErrors() != null) {
+ errors.addAll(flow.getErrors());
+ }
writeFlowFile(installDir, flow);
} catch (IOException e) {
- throw new ProjectManagerException(
- "Project directory " + projectName +
- " cannot be created in " + projectDirectory, e);
+ throw new ProjectManagerException(
+ "Project directory " + projectName +
+ " cannot be created in " + projectDirectory, e);
}
}
- File destDirectory = new File(installDir, PROJECT_DIRECTORY);
+ File destDirectory = new File(installDir, PROJECT_DIRECTORY);
dir.renameTo(destDirectory);
// We install only if the project is not forced install or has no errors
if (force || errors.isEmpty()) {
- // We synchronize on project so that we don't collide when uploading.
- synchronized (project) {
- logger.info("Uploading files to " + projectName);
- project.setSource(installDir.getName());
- project.setLastModifiedTimestamp(System.currentTimeMillis());
- project.setLastModifiedUser(uploader.getUserId());
- project.setFlows(flows);
- }
-
- try {
+ // We synchronize on project so that we don't collide when uploading.
+ synchronized (project) {
+ logger.info("Uploading files to " + projectName);
+ project.setSource(installDir.getName());
+ project.setLastModifiedTimestamp(System.currentTimeMillis());
+ project.setLastModifiedUser(uploader.getUserId());
+ project.setFlows(flows);
+ }
+
+ try {
writeProjectFile(projectPath, project);
} catch (IOException e) {
- throw new ProjectManagerException(
- "Project directory " + projectName +
- " cannot be created in " + projectDirectory, e);
+ throw new ProjectManagerException(
+ "Project directory " + projectName +
+ " cannot be created in " + projectDirectory, e);
}
}
else {
@@ -221,109 +223,108 @@ public class FileProjectManager implements ProjectManager {
throw new ProjectManagerException(bufferErrors.toString());
}
-
- }
-
- @Override
- public synchronized Project createProject(String projectName, String description, User creator) throws ProjectManagerException {
- if (projectName == null || projectName.trim().isEmpty()) {
- throw new ProjectManagerException("Project name cannot be empty.");
- }
- else if (description == null || description.trim().isEmpty()) {
- throw new ProjectManagerException("Description cannot be empty.");
- }
- else if (creator == null) {
- throw new ProjectManagerException("Valid creator user must be set.");
- }
+ }
- if (projects.contains(projectName)) {
- throw new ProjectManagerException("Project already exists.");
- }
-
- File projectPath = new File(projectDirectory, projectName);
- if (projectPath.exists()) {
- throw new ProjectManagerException("Project already exists.");
- }
-
- if(!projectPath.mkdirs()) {
- throw new ProjectManagerException(
- "Project directory " + projectName +
- " cannot be created in " + projectDirectory);
- }
-
- Permission perm = new Permission(Type.ADMIN);
- long time = System.currentTimeMillis();
-
- Project project = new Project(projectName);
- project.setUserPermission(creator.getUserId(), perm);
- project.setDescription(description);
- project.setCreateTimestamp(time);
- project.setLastModifiedTimestamp(time);
- project.setLastModifiedUser(creator.getUserId());
-
- logger.info("Trying to create " + project.getName() + " by user " + creator.getUserId());
- try {
+ @Override
+ public synchronized Project createProject(String projectName, String description, User creator) throws ProjectManagerException {
+ if (projectName == null || projectName.trim().isEmpty()) {
+ throw new ProjectManagerException("Project name cannot be empty.");
+ }
+ else if (description == null || description.trim().isEmpty()) {
+ throw new ProjectManagerException("Description cannot be empty.");
+ }
+ else if (creator == null) {
+ throw new ProjectManagerException("Valid creator user must be set.");
+ }
+
+ if (projects.contains(projectName)) {
+ throw new ProjectManagerException("Project already exists.");
+ }
+
+ File projectPath = new File(projectDirectory, projectName);
+ if (projectPath.exists()) {
+ throw new ProjectManagerException("Project already exists.");
+ }
+
+ if(!projectPath.mkdirs()) {
+ throw new ProjectManagerException(
+ "Project directory " + projectName +
+ " cannot be created in " + projectDirectory);
+ }
+
+ Permission perm = new Permission(Type.ADMIN);
+ long time = System.currentTimeMillis();
+
+ Project project = new Project(projectName);
+ project.setUserPermission(creator.getUserId(), perm);
+ project.setDescription(description);
+ project.setCreateTimestamp(time);
+ project.setLastModifiedTimestamp(time);
+ project.setLastModifiedUser(creator.getUserId());
+
+ logger.info("Trying to create " + project.getName() + " by user " + creator.getUserId());
+ try {
writeProjectFile(projectPath, project);
} catch (IOException e) {
- throw new ProjectManagerException(
- "Project directory " + projectName +
- " cannot be created in " + projectDirectory, e);
+ throw new ProjectManagerException(
+ "Project directory " + projectName +
+ " cannot be created in " + projectDirectory, e);
}
- projects.put(projectName, project);
- return project;
- }
-
- private synchronized void writeProjectFile(File directory, Project project) throws IOException {
- Object object = project.toObject();
- File tmpFile = File.createTempFile("project-",".json", directory);
-
- if (tmpFile.exists()) {
- tmpFile.delete();
- }
-
- logger.info("Writing project file " + tmpFile);
- String output = JSONUtils.toJSON(object, true);
-
- FileWriter writer = new FileWriter(tmpFile);
- try {
- writer.write(output);
- } catch (IOException e) {
- if (writer != null) {
- writer.close();
- }
-
- throw e;
- }
- writer.close();
-
- File projectFile = new File(directory, PROPERTIES_FILENAME);
- File swapFile = new File(directory, PROPERTIES_FILENAME + "_old");
-
- projectFile.renameTo(swapFile);
- tmpFile.renameTo(projectFile);
- swapFile.delete();
-
- }
-
- private void writeFlowFile(File directory, Flow flow) throws IOException {
+ projects.put(projectName, project);
+ return project;
+ }
+
+ private synchronized void writeProjectFile(File directory, Project project) throws IOException {
+ Object object = project.toObject();
+ File tmpFile = File.createTempFile("project-",".json", directory);
+
+ if (tmpFile.exists()) {
+ tmpFile.delete();
+ }
+
+ logger.info("Writing project file " + tmpFile);
+ String output = JSONUtils.toJSON(object, true);
+
+ FileWriter writer = new FileWriter(tmpFile);
+ try {
+ writer.write(output);
+ } catch (IOException e) {
+ if (writer != null) {
+ writer.close();
+ }
+
+ throw e;
+ }
+ writer.close();
+
+ File projectFile = new File(directory, PROPERTIES_FILENAME);
+ File swapFile = new File(directory, PROPERTIES_FILENAME + "_old");
+
+ projectFile.renameTo(swapFile);
+ tmpFile.renameTo(projectFile);
+ swapFile.delete();
+
+ }
+
+ private void writeFlowFile(File directory, Flow flow) throws IOException {
Object object = flow.toObject();
- String filename = flow.getId() + FLOW_EXTENSION;
- File outputFile = new File(directory, filename);
- logger.info("Writing flow file " + outputFile);
- String output = JSONUtils.toJSON(object, true);
-
- FileWriter writer = new FileWriter(outputFile);
- try {
- writer.write(output);
- } catch (IOException e) {
- if (writer != null) {
- writer.close();
- }
-
- throw e;
- }
- writer.close();
- }
+ String filename = flow.getId() + FLOW_EXTENSION;
+ File outputFile = new File(directory, filename);
+ logger.info("Writing flow file " + outputFile);
+ String output = JSONUtils.toJSON(object, true);
+
+ FileWriter writer = new FileWriter(outputFile);
+ try {
+ writer.write(output);
+ } catch (IOException e) {
+ if (writer != null) {
+ writer.close();
+ }
+
+ throw e;
+ }
+ writer.close();
+ }
@Override
public synchronized Project removeProject(String projectName, User user) {
@@ -336,7 +337,7 @@ public class FileProjectManager implements ProjectManager {
public SuffixFilter(String suffix) {
this.suffix = suffix;
}
-
+
@Override
public boolean accept(File pathname) {
String name = pathname.getName();
diff --git a/src/java/azkaban/project/Project.java b/src/java/azkaban/project/Project.java
index 1475abc..ee4e9f9 100644
--- a/src/java/azkaban/project/Project.java
+++ b/src/java/azkaban/project/Project.java
@@ -35,7 +35,14 @@ public class Project {
this.flows = flows;
}
- @SuppressWarnings("unused")
+ public Flow getFlow(String flowId) {
+ if (flows == null) {
+ return null;
+ }
+
+ return flows.get(flowId);
+ }
+
public List<Flow> getFlows() {
List<Flow> retFlow = null;
if (flows != null) {
src/java/azkaban/utils/Props.java 2(+1 -1)
diff --git a/src/java/azkaban/utils/Props.java b/src/java/azkaban/utils/Props.java
index fa462fa..dad659b 100644
--- a/src/java/azkaban/utils/Props.java
+++ b/src/java/azkaban/utils/Props.java
@@ -95,7 +95,7 @@ public class Props {
}
input.close();
}
-
+
/**
* Create props from property input streams
*
src/java/azkaban/webapp/servlet/ProjectManagerServlet.java 331(+181 -150)
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index f4d36c4..890c0e3 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -37,100 +37,131 @@ import azkaban.webapp.session.Session;
import azkaban.webapp.servlet.MultipartParser;
public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
- private static final long serialVersionUID = 1;
- private static final Logger logger = Logger.getLogger(ProjectManagerServlet.class);
- private static final int DEFAULT_UPLOAD_DISK_SPOOL_SIZE = 20 * 1024 * 1024;
-
- private ProjectManager manager;
- private MultipartParser multipartParser;
- private File tempDir;
- private static Comparator<Flow> FLOW_ID_COMPARATOR = new Comparator<Flow>() {
+ private static final long serialVersionUID = 1;
+ private static final Logger logger = Logger.getLogger(ProjectManagerServlet.class);
+ private static final int DEFAULT_UPLOAD_DISK_SPOOL_SIZE = 20 * 1024 * 1024;
+
+ private ProjectManager manager;
+ private MultipartParser multipartParser;
+ private File tempDir;
+ private static Comparator<Flow> FLOW_ID_COMPARATOR = new Comparator<Flow>() {
@Override
public int compare(Flow f1, Flow f2) {
return f1.getId().compareTo(f2.getId());
}
- };
-
- @Override
- public void init(ServletConfig config) throws ServletException {
- super.init(config);
- manager = this.getApplication().getProjectManager();
- tempDir = this.getApplication().getTempDirectory();
- multipartParser = new MultipartParser(DEFAULT_UPLOAD_DISK_SPOOL_SIZE);
- }
-
- @Override
- protected void handleGet(HttpServletRequest req, HttpServletResponse resp,
- Session session) throws ServletException, IOException {
- Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/projectmanager.vm");
- User user = session.getUser();
-
- if ( hasParam(req, "project") ) {
- String projectName = getParam(req, "project");
- Project project = null;
- try {
- project = manager.getProject(projectName, user);
- if (project == null) {
- page.add("errorMsg", "Project " + projectName + " not found.");
- }
- else {
- page.add("project", project);
- page.add("admins", Utils.flattenToString(project.getUsersWithPermission(Type.ADMIN), ","));
- page.add("permissions", project.getUserPermission(user));
-
- List<Flow> flows = project.getFlows();
- if (!flows.isEmpty()) {
- Collections.sort(flows, FLOW_ID_COMPARATOR);
- page.add("flows", flows);
- }
- }
- }
- catch (AccessControlException e) {
- page.add("errorMsg", e.getMessage());
- }
- }
- else {
- page.add("errorMsg", "No project set.");
- }
- page.render();
- }
-
- @Override
- protected void handlePost(HttpServletRequest req, HttpServletResponse resp,
- Session session) throws ServletException, IOException {
- if (ServletFileUpload.isMultipartContent(req)) {
- logger.info("Post is multipart");
- Map<String, Object> params = multipartParser.parseMultipart(req);
- if (params.containsKey("action")) {
- String action = (String)params.get("action");
- if (action.equals("upload")) {
- handleUpload(req, resp, params, session);
- }
- }
- }
- else if (hasParam(req, "action")) {
- String action = getParam(req, "action");
- if (action.equals("create")) {
- handleCreate(req, resp, session);
- }
- }
-
- }
-
- private void handleCreate(HttpServletRequest req, HttpServletResponse resp,
- Session session) throws ServletException {
-
- String projectName = hasParam(req, "name") ? getParam(req, "name") : null;
- String projectDescription = hasParam(req, "description") ? getParam(req, "description") : null;
- logger.info("Create project " + projectName);
-
- User user = session.getUser();
-
- String status = null;
- String action = null;
- String message = null;
- HashMap<String, Object> params = null;
- try {
+ };
+
+ @Override
+ public void init(ServletConfig config) throws ServletException {
+ super.init(config);
+ manager = this.getApplication().getProjectManager();
+ tempDir = this.getApplication().getTempDirectory();
+ multipartParser = new MultipartParser(DEFAULT_UPLOAD_DISK_SPOOL_SIZE);
+ }
+
+ @Override
+ protected void handleGet(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+ if ( hasParam(req, "project") ) {
+ if (hasParam(req, "flow")) {
+ handleFlowPage(req, resp, session);
+ }
+ else {
+ handleProjectPage(req, resp, session);
+ }
+ return;
+ }
+
+ Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/projectpage.vm");
+ page.add("errorMsg", "No project set.");
+ page.render();
+ }
+
+ @Override
+ protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+ if (ServletFileUpload.isMultipartContent(req)) {
+ logger.info("Post is multipart");
+ Map<String, Object> params = multipartParser.parseMultipart(req);
+ if (params.containsKey("action")) {
+ String action = (String)params.get("action");
+ if (action.equals("upload")) {
+ handleUpload(req, resp, params, session);
+ }
+ }
+ }
+ else if (hasParam(req, "action")) {
+ String action = getParam(req, "action");
+ if (action.equals("create")) {
+ handleCreate(req, resp, session);
+ }
+ }
+ }
+
+ private void handleFlowPage(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException {
+ Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/flowpage.vm");
+ String projectName = getParam(req, "project");
+ String flowName = getParam(req, "flow");
+
+ User user = session.getUser();
+ Project project = null;
+ try {
+ project = manager.getProject(projectName, user);
+ if (project == null) {
+ page.add("errorMsg", "Project " + projectName + " not found.");
+ }
+ else {
+ page.add("project", project);
+
+ }
+ }
+ catch (AccessControlException e) {
+ page.add("errorMsg", e.getMessage());
+ }
+ page.render();
+ }
+
+ private void handleProjectPage(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException {
+ Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/projectpage.vm");
+ String projectName = getParam(req, "project");
+
+ User user = session.getUser();
+ Project project = null;
+ try {
+ project = manager.getProject(projectName, user);
+ if (project == null) {
+ page.add("errorMsg", "Project " + projectName + " not found.");
+ }
+ else {
+ page.add("project", project);
+ page.add("admins", Utils.flattenToString(project.getUsersWithPermission(Type.ADMIN), ","));
+ page.add("permissions", project.getUserPermission(user));
+
+ List<Flow> flows = project.getFlows();
+ if (!flows.isEmpty()) {
+ Collections.sort(flows, FLOW_ID_COMPARATOR);
+ page.add("flows", flows);
+ }
+ }
+ }
+ catch (AccessControlException e) {
+ page.add("errorMsg", e.getMessage());
+ }
+ page.render();
+ }
+
+ private void handleCreate(HttpServletRequest req, HttpServletResponse resp,
+ Session session) throws ServletException {
+
+ String projectName = hasParam(req, "name") ? getParam(req, "name") : null;
+ String projectDescription = hasParam(req, "description") ? getParam(req, "description") : null;
+ logger.info("Create project " + projectName);
+
+ User user = session.getUser();
+
+ String status = null;
+ String action = null;
+ String message = null;
+ HashMap<String, Object> params = null;
+ try {
manager.createProject(projectName, projectDescription, user);
status = "success";
action = "redirect";
@@ -141,70 +172,70 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
message = e.getMessage();
status = "error";
}
-
- String response = createJsonResponse(status, message, action, params);
- try {
+
+ String response = createJsonResponse(status, message, action, params);
+ try {
Writer write = resp.getWriter();
write.append(response);
write.flush();
- } catch (IOException e) {
+ } catch (IOException e) {
e.printStackTrace();
}
- }
-
- private void handleUpload(HttpServletRequest req, HttpServletResponse resp, Map<String, Object> multipart,
- Session session) throws ServletException, IOException {
-
- User user = session.getUser();
- String projectName = (String) multipart.get("project");
- FileItem item = (FileItem) multipart.get("file");
- String forceStr = (String) multipart.get("force");
- boolean force = forceStr == null ? false : Boolean.parseBoolean(forceStr);
- File projectDir = null;
- if (projectName == null || projectName.isEmpty()) {
- setErrorMessageInCookie(resp, "No project name found.");
- }
- else if (item == null) {
- setErrorMessageInCookie(resp, "No file found.");
- }
- else {
- try {
- projectDir = extractFile(item);
- manager.uploadProject(projectName, projectDir, user, force);
- setSuccessMessageInCookie(resp, "Project Uploaded");
- }
- catch (Exception e) {
- logger.info("Installation Failed.", e);
- setErrorMessageInCookie(resp, "Installation Failed.\n" + e.getMessage());
- }
-
- if (projectDir != null && projectDir.exists() ) {
- FileUtils.deleteDirectory(projectDir);
- }
- resp.sendRedirect(req.getRequestURI() + "?project=" + projectName);
- }
- }
-
- private File extractFile(FileItem item) throws IOException, ServletException {
- final String contentType = item.getContentType();
- if (contentType.startsWith("application/zip")) {
- return unzipFile(item);
- }
-
- throw new ServletException(String.format("Unsupported file type[%s].", contentType));
- }
-
- private File unzipFile(FileItem item) throws ServletException, IOException {
- File temp = File.createTempFile("job-temp", ".zip");
- temp.deleteOnExit();
- OutputStream out = new BufferedOutputStream(new FileOutputStream(temp));
- IOUtils.copy(item.getInputStream(), out);
- out.close();
- ZipFile zipfile = new ZipFile(temp);
- File unzipped = Utils.createTempDir(tempDir);
- Utils.unzip(zipfile, unzipped);
- temp.delete();
- return unzipped;
- }
+ }
+
+ private void handleUpload(HttpServletRequest req, HttpServletResponse resp, Map<String, Object> multipart,
+ Session session) throws ServletException, IOException {
+
+ User user = session.getUser();
+ String projectName = (String) multipart.get("project");
+ FileItem item = (FileItem) multipart.get("file");
+ String forceStr = (String) multipart.get("force");
+ boolean force = forceStr == null ? false : Boolean.parseBoolean(forceStr);
+ File projectDir = null;
+ if (projectName == null || projectName.isEmpty()) {
+ setErrorMessageInCookie(resp, "No project name found.");
+ }
+ else if (item == null) {
+ setErrorMessageInCookie(resp, "No file found.");
+ }
+ else {
+ try {
+ projectDir = extractFile(item);
+ manager.uploadProject(projectName, projectDir, user, force);
+ setSuccessMessageInCookie(resp, "Project Uploaded");
+ }
+ catch (Exception e) {
+ logger.info("Installation Failed.", e);
+ setErrorMessageInCookie(resp, "Installation Failed.\n" + e.getMessage());
+ }
+
+ if (projectDir != null && projectDir.exists() ) {
+ FileUtils.deleteDirectory(projectDir);
+ }
+ resp.sendRedirect(req.getRequestURI() + "?project=" + projectName);
+ }
+ }
+
+ private File extractFile(FileItem item) throws IOException, ServletException {
+ final String contentType = item.getContentType();
+ if (contentType.startsWith("application/zip")) {
+ return unzipFile(item);
+ }
+
+ throw new ServletException(String.format("Unsupported file type[%s].", contentType));
+ }
+
+ private File unzipFile(FileItem item) throws ServletException, IOException {
+ File temp = File.createTempFile("job-temp", ".zip");
+ temp.deleteOnExit();
+ OutputStream out = new BufferedOutputStream(new FileOutputStream(temp));
+ IOUtils.copy(item.getInputStream(), out);
+ out.close();
+ ZipFile zipfile = new ZipFile(temp);
+ File unzipped = Utils.createTempDir(tempDir);
+ Utils.unzip(zipfile, unzipped);
+ temp.delete();
+ return unzipped;
+ }
}
diff --git a/src/java/azkaban/webapp/servlet/velocity/projectpage.vm b/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
new file mode 100644
index 0000000..2f945a2
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
@@ -0,0 +1,119 @@
+<!DOCTYPE html>
+<html>
+ <head>
+#parse( "azkaban/webapp/servlet/velocity/style.vm" )
+ <script type="text/javascript" src="${context}/js/jquery/jquery.js"></script>
+ <script type="text/javascript" src="${context}/js/jqueryui/jquery-ui.custom.min.js"></script>
+ <script type="text/javascript" src="${context}/js/namespace.js"></script>
+ <script type="text/javascript" src="${context}/js/underscore-1.2.1-min.js"></script>
+ <script type="text/javascript" src="${context}/js/backbone-0.5.3-min.js"></script>
+ <script type="text/javascript" src="${context}/js/jquery.simplemodal.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.project.view.js"></script>
+ <script type="text/javascript">
+ var contextURL = "${context}";
+ var currentTime = ${currentTime};
+ var timezone = "${timezone}";
+ var errorMessage = "${error_message}";
+ var successMessage = "${success_message}";
+ </script>
+ </head>
+ <body>
+#set($current_page="all")
+#parse( "azkaban/webapp/servlet/velocity/nav.vm" )
+ <div class="messaging"><p id="messageClose">X</p><p id="message"></p></div>
+
+ <div class="content">
+#if($errorMsg)
+ <div class="box-error-message">$errorMsg</div>
+#else
+#if($error_message != "null")
+ <div class="box-error-message">$error_message</div>
+#elseif($success_message != "null")
+ <div class="box-success-message">$success_message</div>
+#end
+
+ <div id="all-jobs-content">
+ <div class="section-hd">
+ <h2>Project <span>$project.name</span></h2>
+
+ <a id="project-upload-btn" class="btn1 projectupload" href="#">Upload</a>
+ <a id="project-permission-btn" class="btn5 projectpermission" href="#">Permissions</a>
+ </div><!-- end .section-hd -->
+ </div>
+
+ <div id="project-users">
+ <table class="user-table">
+ <tr><td class="first">Project Admins:</td><td>$admins</td></tr>
+ <tr><td class="first">User ${user_id} Permissions:</td><td>$permissions.toString()</td></tr>
+ </table>
+ </div>
+
+ <div id="project-summary">
+ <table class="summary-table">
+ <tr><td class="first">Name:</td><td>$project.name</td></tr>
+ <tr><td class="first">Created Date:</td><td>$utils.formatDate($project.lastModifiedTimestamp)</td></tr>
+ <tr><td class="first">Modified Date:</td><td>$utils.formatDate($project.createTimestamp)</td></tr>
+ <tr><td class="first">Last Modified by:</td><td>$project.lastModifiedUser</td></tr>
+ <tr><td class="first">Description:</td><td>$project.description</td></tr>
+ </table>
+ </div>
+
+ <div id="flow-tabs">
+ <table id="all-jobs" class="all-jobs job-table">
+ <thead>
+ <tr>
+ <th class="tb-name">Flow Name</th>
+ </tr>
+ </thead>
+ <tbody>
+#if($flows)
+#foreach($flow in $flows)
+ <tr class="row">
+ <td class="tb-name">
+ <div class="jobfolder expand" onclick="expandFlow(this)" id="${flow.id}">
+ <span class="state-icon"></span>
+ <a href="${context}/manager?flow=${flow.id}">${flow.id}</a>
+ </div>
+ </td>
+ </tr>
+#end
+#else
+ <tr><td class="last">No flows uploaded to this project.</td></tr>
+#end
+ </tbody>
+ </table
+ </div>
+#end
+ </div>
+
+ <div id="upload-project" class="modal">
+ <h3>Upload Project Files</h3>
+ <div id="errorMsg" class="box-error-message">$errorMsg</div>
+ <div class="message">
+ <form id="upload-form" enctype="multipart/form-data" method="post" action="$!context/manager">
+ <fieldset>
+ <dl>
+ <dt>Job Archive</dt>
+ <dd><input id="file" name="file" class="file" type="file" onChange="changeFile()" /></dd>
+ <input type="hidden" name="project" value="$project.name" />
+ <input type="hidden" name="action" value="upload" />
+ </dl>
+ </fieldset>
+ </form>
+ </div>
+ <div class="actions">
+ <a class="yes btn2" id="upload-btn" href="#">Upload</a>
+ <a class="no simplemodal-close btn3" href="#">Cancel</a>
+ </div>
+ <div id="invalid-session" class="modal">
+ <h3>Invalid Session</h3>
+ <p>Session has expired. Please re-login.</p>
+ <div class="actions">
+ <a class="yes btn2" id="login-btn" href="#">Re-login</a>
+ </div>
+ </div>
+ </div>
+ </body>
+</html>
+