azkaban-aplcache

Adding flow loading from file.

6/20/2012 10:46:42 PM

Details

diff --git a/src/java/azkaban/flow/Edge.java b/src/java/azkaban/flow/Edge.java
index fde4acb..82f5b25 100644
--- a/src/java/azkaban/flow/Edge.java
+++ b/src/java/azkaban/flow/Edge.java
@@ -5,18 +5,24 @@ public class Edge {
         FAILED, SUCCEEDED, WAITING, CYCLE
     }
 
-    private final Node from;
-    private final Node to;
+    private final Node source;
+    private final Node target;
 
     private State state = State.WAITING;
     
     public Edge(Node from, Node to) {
-        this.from = from;
-        this.to = to;
+        this.source = from;
+        this.target = to;
     }
 
+    public Edge(Edge clone) {
+    	this.source = clone.source;
+    	this.target = clone.target;
+    	this.state = clone.state;
+    }
+    
     public String getId() {
-        return from.getId() + ">>" + to.getId();
+        return getSourceId() + ">>" + getTargetId();
     }
 
     public State getState() {
@@ -27,11 +33,19 @@ public class Edge {
         this.state = state;
     }
 
-    public Node getFrom() {
-        return from;
+    public Node getSource() {
+        return source;
     }
 
-    public Node getTo() {
-        return to;
+    public Node getTarget() {
+        return target;
     }
+    
+	public String getSourceId() {
+		return source == null? null : source.getId();
+	}
+
+	public String getTargetId() {
+		return target == null? null : target.getId();
+	}
 }
diff --git a/src/java/azkaban/flow/ErrorEdge.java b/src/java/azkaban/flow/ErrorEdge.java
new file mode 100644
index 0000000..136e970
--- /dev/null
+++ b/src/java/azkaban/flow/ErrorEdge.java
@@ -0,0 +1,44 @@
+package azkaban.flow;
+
+public class ErrorEdge extends Edge {
+	private final String sourceId;
+	private final String targetId;
+	private final String error;
+	
+	public ErrorEdge(String source, Node target, String error) {
+		super(null, target);
+		this.targetId = target.getId();
+		this.sourceId = source;
+		this.error = error;
+	}
+
+	public ErrorEdge(Node source, String target, String error) {
+		super(source, null);
+		this.targetId = target;
+		this.sourceId = source.getId();
+		this.error = error;
+	}
+	
+	
+	public ErrorEdge(Node source, Node target, String error) {
+		super(source, target);
+		this.targetId = target.getId();
+		this.sourceId = source.getId();
+		this.error = error;
+	}
+	
+	@Override
+	public String getSourceId() {
+		return sourceId;
+	}
+
+	@Override
+	public String getTargetId() {
+		return targetId;
+	}
+	
+	public String getError() {
+		return error;
+	}
+
+}
diff --git a/src/java/azkaban/flow/Flow.java b/src/java/azkaban/flow/Flow.java
index 2775f4f..e23edf4 100644
--- a/src/java/azkaban/flow/Flow.java
+++ b/src/java/azkaban/flow/Flow.java
@@ -3,49 +3,82 @@ package azkaban.flow;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 public class Flow {
     public enum State {
         READY, RUNNING, RUNNING_WITH_FAILURE, FAILED, SUCCEEDED
     }
     private final String id;
+    private ArrayList<Node> baseNodes = new ArrayList<Node>();
     private HashMap<String, Node> nodes = new HashMap<String, Node>();
-    private HashMap<String, Edge> edges = new HashMap<String, Edge>();
-    private ArrayList<String> errors = null;
+    private HashMap<String, Set<Edge>> sourceEdges = new HashMap<String, Set<Edge>>();
+    private HashMap<String, Set<Edge>> targetEdges = new HashMap<String, Set<Edge>>();
+    private ArrayList<Object> errors;
 
     public Flow(String id) {
         this.id = id;
     }
 
+    public void addBaseNode(Node node) {
+    	this.baseNodes.add(node);
+    }
+    
     public void addAllNodes(Collection<Node> nodes) {
         for (Node node: nodes) {
             this.nodes.put(node.getId(), node);
         }
     }
-
-    public void addAllEdges(Collection<Edge> edges) {
-        for (Edge edge: edges) {
-            this.edges.put(edge.getId(), edge);
-        }
-    }
     
     public void addNode(Node node) {
         nodes.put(node.getId(), node);
     }
 
-    public void addEdge(Edge edge) {
-        edges.put(edge.getId(), edge);
-    }
-
     public String getId() {
         return id;
     }
     
-    public void addErrors(String error) {
+    public void addError(Object error) {
         if (errors == null) {
-            errors = new ArrayList<String>();
+            errors = new ArrayList<Object>();
         }
   
         errors.add(error);
     }
+    
+    public List<Object> getErrors() {
+    	return errors;
+    }
+    
+    public boolean hasErrors() {
+    	return errors != null && !errors.isEmpty();
+    }
+    
+    public void addEdge(Edge edge) {
+    	String source = edge.getSourceId();
+    	String target = edge.getTargetId();
+
+    	if (edge instanceof ErrorEdge) {
+    		errors.add(edge);
+    	}
+
+    	Set<Edge> sourceSet = getEdgeSet(sourceEdges, source);
+    	sourceSet.add(edge);
+    	
+    	Set<Edge> targetSet = getEdgeSet(targetEdges, target);
+    	targetSet.add(edge);
+    	
+    }
+    
+    private Set<Edge> getEdgeSet(HashMap<String, Set<Edge>> map, String id) {
+    	Set<Edge> edges = map.get(id);
+    	if (edges == null) {
+    		edges = new HashSet<Edge>();
+    		map.put(id, edges);
+    	}
+    	
+    	return edges;
+    }
 }
\ No newline at end of file
diff --git a/src/java/azkaban/flow/Node.java b/src/java/azkaban/flow/Node.java
index a0b1aec..833fc42 100644
--- a/src/java/azkaban/flow/Node.java
+++ b/src/java/azkaban/flow/Node.java
@@ -1,8 +1,5 @@
 package azkaban.flow;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import azkaban.utils.Props;
 
 public class Node {
@@ -11,9 +8,7 @@ public class Node {
     }
 
     private final String id;
-    private List<Edge> outEdges = new ArrayList<Edge>();
-    private List<Edge> inEdges = new ArrayList<Edge>();
-    private List<String> missingDependency;
+
     private State state = State.WAITING;
     private Props props;
     
@@ -21,12 +16,14 @@ public class Node {
         this.id = id;
     }
 
-    public void addOutEdges(Edge edge) {
-        outEdges.add(edge);
-    }
-
-    public void addInEdges(Edge edge) {
-        inEdges.add(edge);
+    /**
+     * Clones nodes
+     * @param node
+     */
+    public Node(Node clone) {
+    	this.id = clone.id;
+    	this.props = clone.props;
+    	this.state = clone.state;
     }
 
     public String getId() {
@@ -48,24 +45,4 @@ public class Node {
     public void setProps(Props props) {
         this.props = props;
     }
-
-    public List<Edge> getOutEdges() {
-        return outEdges;
-    }
-
-    public List<Edge> getInEdges() {
-        return inEdges;
-    }
-
-    public void addMissingDependency(String dependency) {
-        if (missingDependency==null) {
-            missingDependency = new ArrayList<String>();
-        }
-
-        missingDependency.add(dependency);
-    }
-
-    public boolean hasMissingDependency() {
-        return missingDependency != null && missingDependency.size() > 0;
-    }
 }
\ No newline at end of file
diff --git a/src/java/azkaban/utils/FlowUtils.java b/src/java/azkaban/utils/FlowUtils.java
index b148255..3723b14 100644
--- a/src/java/azkaban/utils/FlowUtils.java
+++ b/src/java/azkaban/utils/FlowUtils.java
@@ -1,50 +1,208 @@
 package azkaban.utils;
 
 import java.io.File;
-import java.io.FileNamFilter;
-import java.io.FilenameFilter;
+import java.io.FileFilter;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import azkaban.flow.Edge;
+import azkaban.flow.ErrorEdge;
 import azkaban.flow.Flow;
 import azkaban.flow.Node;
 
 public class FlowUtils {
-	public static void loadProjectFilesFrom(File dir, List<Flow> resultFlow, List<Props> resultProps) {
-		File[] propertyFiles = dir.listFiles();
-		ArrayList<String> errors = new ArrayList<String>();
-		Props propertyFile = null;
-		for (File file: propertyFiles) {
-			
-		}
+	private static final DirFilter DIR_FILTER = new DirFilter();
+	private static final String PROPERTY_SUFFIX = ".properties";
+	private static final String DEPENDENCIES = "dependencies";
+	private static final String JOB_SUFFIX = ".job";
+	
+	public static Map<String, Flow> loadProject(File dir) {
+		String base = dir.getAbsolutePath();
+		
+		// Load all the project and job files.
+		Map<String,Node> jobMap = new HashMap<String,Node>();
+		Set<String> duplicateJobs = new HashSet<String>();
+		List<String> errors = new ArrayList<String>();
+		loadProjectFromDir(base, dir, jobMap, duplicateJobs, errors);
+		
+		// Create edge dependency sets.
+		Map<String, Set<Edge>> dependencies = new HashMap<String, Set<Edge>>();
+		resolveDependencies(jobMap, duplicateJobs, dependencies);
+
+		HashMap<String, Flow> flows = buildFlowsFromDependencies(jobMap, dependencies);
+		return flows;
 	}
 	
-	private static void loadProjectFromDir(File dir, Map<String, Node> node, Map<String, Props> propsFiles, List<String> errors) {
-		File[] propertyFiles = dir.listFiles(new SuffixFilter(".properties"));
+	/**
+	 * Loads all the files, prop and job files. Props are assigned to the job nodes.
+	 */
+	private static void loadProjectFromDir(String baseDir, File dir, Map<String, Node> jobMap, Set<String> duplicateJobs, List<String> errors) {
+		// Load all property files
+		File[] propertyFiles = dir.listFiles(new SuffixFilter(PROPERTY_SUFFIX));
 		Props parent = null;
 		for (File file: propertyFiles) {
 			try {
-				Props props = new Props(parent, file);
+				parent = new Props(parent, file);
 			} catch (IOException e) {
 				errors.add("Error loading properties " + file.getName() + ":" + e.getMessage());
 			}
 		}
+		
+		// Load all Job files. If there's a duplicate name, then we don't load
+		File[] jobFiles = dir.listFiles(new SuffixFilter(JOB_SUFFIX));
+		for (File file: jobFiles) {
+			try {
+				String jobName = getJobName(file, JOB_SUFFIX);
+
+				if (!duplicateJobs.contains(jobName)) {
+					if (jobMap.containsKey(jobName)) {
+						duplicateJobs.add(jobName);
+						jobMap.remove(jobName);
+					}
+					else {
+						Props prop = new Props(parent, file);
+						Node node = new Node(jobName, prop);
+					
+						jobMap.put(jobName, node);
+					}
+				}
+				
+			} catch (IOException e) {
+				errors.add("Error loading job file " + file.getName() + ":" + e.getMessage());
+			}
+		}
+		
+		File[] subDirs = dir.listFiles(DIR_FILTER);
+		for (File file: subDirs) {
+			loadProjectFromDir(baseDir, file, jobMap, duplicateJobs, errors);
+		}
+		
+	}
+	
+	private static void resolveDependencies(Map<String, Node> jobMap, Set<String> duplicateJobs, Map<String, Set<Edge>> nodeDependencies) {
+		// Add all the in edges and out edges. Catch bad dependencies and self referrals. Also collect list of nodes who are parents.
+		for (Node node: jobMap.values()) {
+			List<String> dependencyList = node.getProps().getStringList(DEPENDENCIES, (List<String>)null);
+			
+			if (duplicateJobs != null) {
+				Set<Edge> dependencies = nodeDependencies.get(node.getId());
+				if (dependencies == null) {
+					dependencies = new HashSet<Edge>();
+					
+					for (String dependencyName : dependencyList) {
+						if (dependencyName == null || dependencyName.trim().isEmpty()) {
+							continue;
+						}
+						
+						Node dependencyNode = jobMap.get(dependencyName);
+						if (dependencyNode == null) {
+							if (duplicateJobs.contains(dependencyName)) {
+								dependencies.add(new ErrorEdge(dependencyName, node, "Ambiguous Dependency. Duplicates found."));
+							}
+							else {
+								dependencies.add(new ErrorEdge(dependencyName, node, "Dependency not found."));
+							}
+						}
+						else if (dependencyNode == node) {
+							// We have a self cycle
+							dependencies.add(new ErrorEdge(dependencyName, node, "Self cycle found."));
+						}
+						else {
+							dependencies.add(new Edge(dependencyNode, node));
+						}
+					}
+					
+					if (!dependencies.isEmpty()) {
+						nodeDependencies.put(node.getId(), dependencies);
+					}
+				}
+			}
+		}
+		
+	}
+	
+	private static HashMap<String, Flow> buildFlowsFromDependencies(Map<String, Node> nodes, Map<String, Set<Edge>> nodeDependencies) {
+		// Find all root nodes by finding ones without dependents.
+		HashSet<String> nonRootNodes = new HashSet<String>();
+		for (Set<Edge> edges: nodeDependencies.values()) {
+			for (Edge edge: edges) {
+				nonRootNodes.add(edge.getSourceId());
+			}
+		}
+		
+		
+		HashMap<String, Flow> flows = new HashMap<String, Flow>();
+		
+		// Now create flows. Bad flows are marked invalid
+		Set<String> visitedNodes = new HashSet<String>();
+		for (Node base: nodes.values()) {
+			if (!nonRootNodes.contains(base)) {
+				Flow flow = new Flow(base.getId());
+				flow.addBaseNode(base);
+				constructFlow(flow, base, nodes, nodeDependencies, visitedNodes);
+				flows.put(base.getId(), flow);
+			}
+		}
+
+		return flows;
+	}
+	
+	private static void constructFlow(Flow flow, Node node, Map<String, Node> nodes, Map<String, Set<Edge>> nodeDependencies, Set<String> visited) {
+		visited.add(node.getId());
+		flow.addNode(node);
+		Set<Edge> dependencies = nodeDependencies.get(node.getId());
+		
+		if (dependencies != null) {
+			for (Edge edge: dependencies) {
+				if (edge instanceof ErrorEdge) {
+					flow.addEdge(edge);
+				}
+				else if (visited.contains(edge.getSourceId())){
+					// We have a cycle. We set it as an error edge
+					edge = new ErrorEdge(edge.getSourceId(), node, "Cyclical dependencies found.");
+					flow.addEdge(edge);
+				}
+				else {
+					// This should not be null
+					flow.addEdge(edge);
+					Node fromNode = edge.getSource();
+					constructFlow(flow, fromNode, nodes, nodeDependencies, visited);					
+				}
+			}
+		}
+		
+		visited.remove(node.getId());
+	}
+	
+	private static String getJobName(File file, String suffix) {
+		String filename = file.getName();
+		return filename.substring(0, filename.length() - JOB_SUFFIX.length());
+	}
+
+	private static class DirFilter implements FileFilter {
+		@Override
+		public boolean accept(File pathname) {
+			return pathname.isDirectory();
+		}
 	}
 	
-	private class SuffixFilter implements FilenameFilter {
+	private static class SuffixFilter implements FileFilter {
 		private String suffix;
 		
 		public SuffixFilter(String suffix) {
 			this.suffix = suffix;
 		}
-		
+
 		@Override
-		public boolean accept(File dir, String name) {
-			return name.endsWith(suffix);
+		public boolean accept(File pathname) {
+			String name = pathname.getName();
+			
+			return pathname.isFile() && !pathname.isHidden() && name.length() > suffix.length() && name.endsWith(suffix);
 		}
-		
 	}
 }