azkaban-aplcache
Changes
src/java/azkaban/flow/Edge.java 32(+23 -9)
src/java/azkaban/flow/ErrorEdge.java 44(+44 -0)
src/java/azkaban/flow/Flow.java 61(+47 -14)
src/java/azkaban/flow/Node.java 41(+9 -32)
src/java/azkaban/utils/FlowUtils.java 192(+175 -17)
Details
src/java/azkaban/flow/Edge.java 32(+23 -9)
diff --git a/src/java/azkaban/flow/Edge.java b/src/java/azkaban/flow/Edge.java
index fde4acb..82f5b25 100644
--- a/src/java/azkaban/flow/Edge.java
+++ b/src/java/azkaban/flow/Edge.java
@@ -5,18 +5,24 @@ public class Edge {
FAILED, SUCCEEDED, WAITING, CYCLE
}
- private final Node from;
- private final Node to;
+ private final Node source;
+ private final Node target;
private State state = State.WAITING;
public Edge(Node from, Node to) {
- this.from = from;
- this.to = to;
+ this.source = from;
+ this.target = to;
}
+ public Edge(Edge clone) {
+ this.source = clone.source;
+ this.target = clone.target;
+ this.state = clone.state;
+ }
+
public String getId() {
- return from.getId() + ">>" + to.getId();
+ return getSourceId() + ">>" + getTargetId();
}
public State getState() {
@@ -27,11 +33,19 @@ public class Edge {
this.state = state;
}
- public Node getFrom() {
- return from;
+ public Node getSource() {
+ return source;
}
- public Node getTo() {
- return to;
+ public Node getTarget() {
+ return target;
}
+
+ public String getSourceId() {
+ return source == null? null : source.getId();
+ }
+
+ public String getTargetId() {
+ return target == null? null : target.getId();
+ }
}
src/java/azkaban/flow/ErrorEdge.java 44(+44 -0)
diff --git a/src/java/azkaban/flow/ErrorEdge.java b/src/java/azkaban/flow/ErrorEdge.java
new file mode 100644
index 0000000..136e970
--- /dev/null
+++ b/src/java/azkaban/flow/ErrorEdge.java
@@ -0,0 +1,44 @@
+package azkaban.flow;
+
+public class ErrorEdge extends Edge {
+ private final String sourceId;
+ private final String targetId;
+ private final String error;
+
+ public ErrorEdge(String source, Node target, String error) {
+ super(null, target);
+ this.targetId = target.getId();
+ this.sourceId = source;
+ this.error = error;
+ }
+
+ public ErrorEdge(Node source, String target, String error) {
+ super(source, null);
+ this.targetId = target;
+ this.sourceId = source.getId();
+ this.error = error;
+ }
+
+
+ public ErrorEdge(Node source, Node target, String error) {
+ super(source, target);
+ this.targetId = target.getId();
+ this.sourceId = source.getId();
+ this.error = error;
+ }
+
+ @Override
+ public String getSourceId() {
+ return sourceId;
+ }
+
+ @Override
+ public String getTargetId() {
+ return targetId;
+ }
+
+ public String getError() {
+ return error;
+ }
+
+}
src/java/azkaban/flow/Flow.java 61(+47 -14)
diff --git a/src/java/azkaban/flow/Flow.java b/src/java/azkaban/flow/Flow.java
index 2775f4f..e23edf4 100644
--- a/src/java/azkaban/flow/Flow.java
+++ b/src/java/azkaban/flow/Flow.java
@@ -3,49 +3,82 @@ package azkaban.flow;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
public class Flow {
public enum State {
READY, RUNNING, RUNNING_WITH_FAILURE, FAILED, SUCCEEDED
}
private final String id;
+ private ArrayList<Node> baseNodes = new ArrayList<Node>();
private HashMap<String, Node> nodes = new HashMap<String, Node>();
- private HashMap<String, Edge> edges = new HashMap<String, Edge>();
- private ArrayList<String> errors = null;
+ private HashMap<String, Set<Edge>> sourceEdges = new HashMap<String, Set<Edge>>();
+ private HashMap<String, Set<Edge>> targetEdges = new HashMap<String, Set<Edge>>();
+ private ArrayList<Object> errors;
public Flow(String id) {
this.id = id;
}
+ public void addBaseNode(Node node) {
+ this.baseNodes.add(node);
+ }
+
public void addAllNodes(Collection<Node> nodes) {
for (Node node: nodes) {
this.nodes.put(node.getId(), node);
}
}
-
- public void addAllEdges(Collection<Edge> edges) {
- for (Edge edge: edges) {
- this.edges.put(edge.getId(), edge);
- }
- }
public void addNode(Node node) {
nodes.put(node.getId(), node);
}
- public void addEdge(Edge edge) {
- edges.put(edge.getId(), edge);
- }
-
public String getId() {
return id;
}
- public void addErrors(String error) {
+ public void addError(Object error) {
if (errors == null) {
- errors = new ArrayList<String>();
+ errors = new ArrayList<Object>();
}
errors.add(error);
}
+
+ public List<Object> getErrors() {
+ return errors;
+ }
+
+ public boolean hasErrors() {
+ return errors != null && !errors.isEmpty();
+ }
+
+ public void addEdge(Edge edge) {
+ String source = edge.getSourceId();
+ String target = edge.getTargetId();
+
+ if (edge instanceof ErrorEdge) {
+ errors.add(edge);
+ }
+
+ Set<Edge> sourceSet = getEdgeSet(sourceEdges, source);
+ sourceSet.add(edge);
+
+ Set<Edge> targetSet = getEdgeSet(targetEdges, target);
+ targetSet.add(edge);
+
+ }
+
+ private Set<Edge> getEdgeSet(HashMap<String, Set<Edge>> map, String id) {
+ Set<Edge> edges = map.get(id);
+ if (edges == null) {
+ edges = new HashSet<Edge>();
+ map.put(id, edges);
+ }
+
+ return edges;
+ }
}
\ No newline at end of file
src/java/azkaban/flow/Node.java 41(+9 -32)
diff --git a/src/java/azkaban/flow/Node.java b/src/java/azkaban/flow/Node.java
index a0b1aec..833fc42 100644
--- a/src/java/azkaban/flow/Node.java
+++ b/src/java/azkaban/flow/Node.java
@@ -1,8 +1,5 @@
package azkaban.flow;
-import java.util.ArrayList;
-import java.util.List;
-
import azkaban.utils.Props;
public class Node {
@@ -11,9 +8,7 @@ public class Node {
}
private final String id;
- private List<Edge> outEdges = new ArrayList<Edge>();
- private List<Edge> inEdges = new ArrayList<Edge>();
- private List<String> missingDependency;
+
private State state = State.WAITING;
private Props props;
@@ -21,12 +16,14 @@ public class Node {
this.id = id;
}
- public void addOutEdges(Edge edge) {
- outEdges.add(edge);
- }
-
- public void addInEdges(Edge edge) {
- inEdges.add(edge);
+ /**
+ * Clones nodes
+ * @param node
+ */
+ public Node(Node clone) {
+ this.id = clone.id;
+ this.props = clone.props;
+ this.state = clone.state;
}
public String getId() {
@@ -48,24 +45,4 @@ public class Node {
public void setProps(Props props) {
this.props = props;
}
-
- public List<Edge> getOutEdges() {
- return outEdges;
- }
-
- public List<Edge> getInEdges() {
- return inEdges;
- }
-
- public void addMissingDependency(String dependency) {
- if (missingDependency==null) {
- missingDependency = new ArrayList<String>();
- }
-
- missingDependency.add(dependency);
- }
-
- public boolean hasMissingDependency() {
- return missingDependency != null && missingDependency.size() > 0;
- }
}
\ No newline at end of file
src/java/azkaban/utils/FlowUtils.java 192(+175 -17)
diff --git a/src/java/azkaban/utils/FlowUtils.java b/src/java/azkaban/utils/FlowUtils.java
index b148255..3723b14 100644
--- a/src/java/azkaban/utils/FlowUtils.java
+++ b/src/java/azkaban/utils/FlowUtils.java
@@ -1,50 +1,208 @@
package azkaban.utils;
import java.io.File;
-import java.io.FileNamFilter;
-import java.io.FilenameFilter;
+import java.io.FileFilter;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import azkaban.flow.Edge;
+import azkaban.flow.ErrorEdge;
import azkaban.flow.Flow;
import azkaban.flow.Node;
public class FlowUtils {
- public static void loadProjectFilesFrom(File dir, List<Flow> resultFlow, List<Props> resultProps) {
- File[] propertyFiles = dir.listFiles();
- ArrayList<String> errors = new ArrayList<String>();
- Props propertyFile = null;
- for (File file: propertyFiles) {
-
- }
+ private static final DirFilter DIR_FILTER = new DirFilter();
+ private static final String PROPERTY_SUFFIX = ".properties";
+ private static final String DEPENDENCIES = "dependencies";
+ private static final String JOB_SUFFIX = ".job";
+
+ public static Map<String, Flow> loadProject(File dir) {
+ String base = dir.getAbsolutePath();
+
+ // Load all the project and job files.
+ Map<String,Node> jobMap = new HashMap<String,Node>();
+ Set<String> duplicateJobs = new HashSet<String>();
+ List<String> errors = new ArrayList<String>();
+ loadProjectFromDir(base, dir, jobMap, duplicateJobs, errors);
+
+ // Create edge dependency sets.
+ Map<String, Set<Edge>> dependencies = new HashMap<String, Set<Edge>>();
+ resolveDependencies(jobMap, duplicateJobs, dependencies);
+
+ HashMap<String, Flow> flows = buildFlowsFromDependencies(jobMap, dependencies);
+ return flows;
}
- private static void loadProjectFromDir(File dir, Map<String, Node> node, Map<String, Props> propsFiles, List<String> errors) {
- File[] propertyFiles = dir.listFiles(new SuffixFilter(".properties"));
+ /**
+ * Loads all the files, prop and job files. Props are assigned to the job nodes.
+ */
+ private static void loadProjectFromDir(String baseDir, File dir, Map<String, Node> jobMap, Set<String> duplicateJobs, List<String> errors) {
+ // Load all property files
+ File[] propertyFiles = dir.listFiles(new SuffixFilter(PROPERTY_SUFFIX));
Props parent = null;
for (File file: propertyFiles) {
try {
- Props props = new Props(parent, file);
+ parent = new Props(parent, file);
} catch (IOException e) {
errors.add("Error loading properties " + file.getName() + ":" + e.getMessage());
}
}
+
+ // Load all Job files. If there's a duplicate name, then we don't load
+ File[] jobFiles = dir.listFiles(new SuffixFilter(JOB_SUFFIX));
+ for (File file: jobFiles) {
+ try {
+ String jobName = getJobName(file, JOB_SUFFIX);
+
+ if (!duplicateJobs.contains(jobName)) {
+ if (jobMap.containsKey(jobName)) {
+ duplicateJobs.add(jobName);
+ jobMap.remove(jobName);
+ }
+ else {
+ Props prop = new Props(parent, file);
+ Node node = new Node(jobName, prop);
+
+ jobMap.put(jobName, node);
+ }
+ }
+
+ } catch (IOException e) {
+ errors.add("Error loading job file " + file.getName() + ":" + e.getMessage());
+ }
+ }
+
+ File[] subDirs = dir.listFiles(DIR_FILTER);
+ for (File file: subDirs) {
+ loadProjectFromDir(baseDir, file, jobMap, duplicateJobs, errors);
+ }
+
+ }
+
+ private static void resolveDependencies(Map<String, Node> jobMap, Set<String> duplicateJobs, Map<String, Set<Edge>> nodeDependencies) {
+ // Add all the in edges and out edges. Catch bad dependencies and self referrals. Also collect list of nodes who are parents.
+ for (Node node: jobMap.values()) {
+ List<String> dependencyList = node.getProps().getStringList(DEPENDENCIES, (List<String>)null);
+
+ if (duplicateJobs != null) {
+ Set<Edge> dependencies = nodeDependencies.get(node.getId());
+ if (dependencies == null) {
+ dependencies = new HashSet<Edge>();
+
+ for (String dependencyName : dependencyList) {
+ if (dependencyName == null || dependencyName.trim().isEmpty()) {
+ continue;
+ }
+
+ Node dependencyNode = jobMap.get(dependencyName);
+ if (dependencyNode == null) {
+ if (duplicateJobs.contains(dependencyName)) {
+ dependencies.add(new ErrorEdge(dependencyName, node, "Ambiguous Dependency. Duplicates found."));
+ }
+ else {
+ dependencies.add(new ErrorEdge(dependencyName, node, "Dependency not found."));
+ }
+ }
+ else if (dependencyNode == node) {
+ // We have a self cycle
+ dependencies.add(new ErrorEdge(dependencyName, node, "Self cycle found."));
+ }
+ else {
+ dependencies.add(new Edge(dependencyNode, node));
+ }
+ }
+
+ if (!dependencies.isEmpty()) {
+ nodeDependencies.put(node.getId(), dependencies);
+ }
+ }
+ }
+ }
+
+ }
+
+ private static HashMap<String, Flow> buildFlowsFromDependencies(Map<String, Node> nodes, Map<String, Set<Edge>> nodeDependencies) {
+ // Find all root nodes by finding ones without dependents.
+ HashSet<String> nonRootNodes = new HashSet<String>();
+ for (Set<Edge> edges: nodeDependencies.values()) {
+ for (Edge edge: edges) {
+ nonRootNodes.add(edge.getSourceId());
+ }
+ }
+
+
+ HashMap<String, Flow> flows = new HashMap<String, Flow>();
+
+ // Now create flows. Bad flows are marked invalid
+ Set<String> visitedNodes = new HashSet<String>();
+ for (Node base: nodes.values()) {
+ if (!nonRootNodes.contains(base)) {
+ Flow flow = new Flow(base.getId());
+ flow.addBaseNode(base);
+ constructFlow(flow, base, nodes, nodeDependencies, visitedNodes);
+ flows.put(base.getId(), flow);
+ }
+ }
+
+ return flows;
+ }
+
+ private static void constructFlow(Flow flow, Node node, Map<String, Node> nodes, Map<String, Set<Edge>> nodeDependencies, Set<String> visited) {
+ visited.add(node.getId());
+ flow.addNode(node);
+ Set<Edge> dependencies = nodeDependencies.get(node.getId());
+
+ if (dependencies != null) {
+ for (Edge edge: dependencies) {
+ if (edge instanceof ErrorEdge) {
+ flow.addEdge(edge);
+ }
+ else if (visited.contains(edge.getSourceId())){
+ // We have a cycle. We set it as an error edge
+ edge = new ErrorEdge(edge.getSourceId(), node, "Cyclical dependencies found.");
+ flow.addEdge(edge);
+ }
+ else {
+ // This should not be null
+ flow.addEdge(edge);
+ Node fromNode = edge.getSource();
+ constructFlow(flow, fromNode, nodes, nodeDependencies, visited);
+ }
+ }
+ }
+
+ visited.remove(node.getId());
+ }
+
+ private static String getJobName(File file, String suffix) {
+ String filename = file.getName();
+ return filename.substring(0, filename.length() - JOB_SUFFIX.length());
+ }
+
+ private static class DirFilter implements FileFilter {
+ @Override
+ public boolean accept(File pathname) {
+ return pathname.isDirectory();
+ }
}
- private class SuffixFilter implements FilenameFilter {
+ private static class SuffixFilter implements FileFilter {
private String suffix;
public SuffixFilter(String suffix) {
this.suffix = suffix;
}
-
+
@Override
- public boolean accept(File dir, String name) {
- return name.endsWith(suffix);
+ public boolean accept(File pathname) {
+ String name = pathname.getName();
+
+ return pathname.isFile() && !pathname.isHidden() && name.length() > suffix.length() && name.endsWith(suffix);
}
-
}
}