azkaban-aplcache

Loading of Flows.

6/23/2012 12:01:08 AM

Details

diff --git a/src/java/azkaban/flow/Flow.java b/src/java/azkaban/flow/Flow.java
index 05dc08c..ab104c0 100644
--- a/src/java/azkaban/flow/Flow.java
+++ b/src/java/azkaban/flow/Flow.java
@@ -5,8 +5,11 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import azkaban.utils.Props;
+
 public class Flow {
     public enum State {
         READY, RUNNING, RUNNING_WITH_FAILURE, FAILED, SUCCEEDED
@@ -14,6 +17,7 @@ public class Flow {
     private final String id;
     private ArrayList<Node> baseNodes = new ArrayList<Node>();
     private HashMap<String, Node> nodes = new HashMap<String, Node>();
+    private HashMap<String, Edge> edges = new HashMap<String, Edge>();
     private HashMap<String, Set<Edge>> sourceEdges = new HashMap<String, Set<Edge>>();
     private HashMap<String, Set<Edge>> targetEdges = new HashMap<String, Set<Edge>>();
     private ArrayList<Object> errors;
@@ -56,6 +60,14 @@ public class Flow {
     	return errors != null && !errors.isEmpty();
     }
     
+    public Collection<Node> getNodes() {
+    	return nodes.values();
+    }
+    
+    public Collection<Edge> getEdges() {
+    	return edges.values();
+    }
+    
     public void addEdge(Edge edge) {
     	String source = edge.getSourceId();
     	String target = edge.getTargetId();
@@ -70,6 +82,7 @@ public class Flow {
     	Set<Edge> targetSet = getEdgeSet(targetEdges, target);
     	targetSet.add(edge);
     	
+    	edges.put(edge.getId(), edge);
     }
     
     private Set<Edge> getEdgeSet(HashMap<String, Set<Edge>> map, String id) {
@@ -81,4 +94,85 @@ public class Flow {
     	
     	return edges;
     }
+    
+    public Map<String,Object> toObject() {
+		HashMap<String, Object> flowObj = new HashMap<String, Object>();
+		flowObj.put("type", "flow");
+		flowObj.put("id", getId());
+		flowObj.put("properties", objectizeProperties());
+		flowObj.put("nodes", objectizeNodes());
+		flowObj.put("edges", objectizeEdges());
+		
+		return flowObj;
+    }
+    
+	private List<Map<String,Object>> objectizeNodes() {
+		ArrayList<Map<String,Object>> result = new ArrayList<Map<String,Object>>();
+		for (Node node : getNodes()) {
+			HashMap<String, Object> nodeObj = new HashMap<String, Object>();
+			nodeObj.put("id", node.getId());
+			nodeObj.put("props.source", node.getProps().getSource());
+			Props parentProps = node.getProps().getParent();
+			
+			if (parentProps != null) {
+				nodeObj.put("inherited.source", parentProps.getSource());
+			}
+			result.add(nodeObj);
+		}
+		
+		return result;
+	}
+	
+	private List<Map<String,Object>> objectizeEdges() {
+		ArrayList<Map<String,Object>> result = new ArrayList<Map<String,Object>>();
+		for (Edge edge: getEdges()) {
+			HashMap<String, Object> edgeObj = new HashMap<String, Object>();
+			edgeObj.put("id", edge.getId());
+			edgeObj.put("source", edge.getSourceId());
+			edgeObj.put("target", edge.getTargetId());
+			if (edge instanceof ErrorEdge) {
+				ErrorEdge errorEdge = (ErrorEdge)edge;
+				edgeObj.put("error", errorEdge.getError());
+			}
+			result.add(edgeObj);
+		}
+		
+		return result;
+	}
+	
+	@SuppressWarnings("unchecked")
+	private List<Map<String,Object>> objectizeProperties() {
+		ArrayList<Map<String,Object>> result = new ArrayList<Map<String,Object>>();
+		
+		HashMap<String, Object> properties = new HashMap<String, Object>();
+		for (Node node: getNodes()) {
+			Props props = node.getProps().getParent();
+			if (props != null) {
+				traverseAndObjectizeProperties(properties, props);
+			}
+		}
+		
+		for (Object propMap : properties.values()) {
+			result.add((Map<String,Object>)propMap);
+		}
+		
+		return result;
+	}
+	
+	private void traverseAndObjectizeProperties(HashMap<String, Object> properties, Props props) {
+		if (props.getSource() == null || properties.containsKey(props.getSource())) {
+			return;
+		}
+		
+		HashMap<String, Object> propObj = new HashMap<String,Object>();
+		propObj.put("source", props.getSource());
+		properties.put(props.getSource(), propObj);
+		
+		Props parent = props.getParent();
+		if (parent != null) {
+			propObj.put("inherits", parent.getSource());
+			
+			traverseAndObjectizeProperties(properties, parent);
+		}
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/flow/Node.java b/src/java/azkaban/flow/Node.java
index a8f1f29..68012f6 100644
--- a/src/java/azkaban/flow/Node.java
+++ b/src/java/azkaban/flow/Node.java
@@ -10,6 +10,7 @@ public class Node {
     private final String id;
 
     private State state = State.WAITING;
+
     private Props props;
     
     public Node(String id, Props props) {
diff --git a/src/java/azkaban/project/FileProjectManager.java b/src/java/azkaban/project/FileProjectManager.java
index 3d0fe13..0ad73bf 100644
--- a/src/java/azkaban/project/FileProjectManager.java
+++ b/src/java/azkaban/project/FileProjectManager.java
@@ -11,6 +11,8 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.log4j.Logger;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
 
 import azkaban.flow.ErrorEdge;
 import azkaban.flow.Flow;
@@ -28,7 +30,10 @@ import azkaban.utils.Props;
  */
 public class FileProjectManager implements ProjectManager {
 	public static final String DIRECTORY_PARAM = "file.project.loader.path";
+	private static final DateTimeFormatter FILE_DATE_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd-HH:mm.ss.SSS");
 	private static final String PROPERTIES_FILENAME = "project.json";
+	private static final String PROJECT_DIRECTORY = "src";
+	private static final String FLOW_EXTENSION = ".flow";
     private static final Logger logger = Logger.getLogger(FileProjectManager.class);
     private ConcurrentHashMap<String, Project> projects = new ConcurrentHashMap<String, Project>();
 
@@ -128,13 +133,34 @@ public class FileProjectManager implements ProjectManager {
     	
 		Map<String, Flow> flows = new HashMap<String,Flow>();
 		List<String> errors = new ArrayList<String>();
-		FlowUtils.loadProject(dir, flows, errors);
+		List<Props> propsList = new ArrayList<Props>();
+		FlowUtils.loadProject(dir, flows, propsList, errors);
+		
+    	File projectPath = new File(projectDirectory, projectName);
+		File installDir = new File(projectPath, FILE_DATE_FORMAT.print(System.currentTimeMillis()));
+		if (!installDir.mkdir()) {
+			throw new ProjectManagerException("Cannot create directory in " + projectDirectory);
+		}
+		
+		for (Flow flow: flows.values()) {
+	    	try {
+				writeFlowFile(installDir, flow);
+			} catch (IOException e) {
+	    		throw new ProjectManagerException(
+	    				"Project directory " + projectName + 
+	    				" cannot be created in " + projectDirectory, e);
+			}
+		}
+		
+    	File destDirectory = new File(installDir, PROJECT_DIRECTORY);
+		dir.renameTo(destDirectory);
 		
 		// We install only if the project is not forced install or has no errors
 		if (force || errors.isEmpty()) {
 	    	// We synchronize on project so that we don't collide when uploading.
 	    	synchronized (project) {
 	    		logger.info("Uploading files to " + projectName);
+	    		project.setSource(projectDirectory.getName());
 	    		project.setLastModifiedTimestamp(System.currentTimeMillis());
 	    		project.setLastModifiedUser(uploader.getUserId());
 	    	}
@@ -146,6 +172,7 @@ public class FileProjectManager implements ProjectManager {
 				bufferErrors.append(error);
 				bufferErrors.append("\n");
 			}
+			
 			throw new ProjectManagerException(bufferErrors.toString());
 		}
 		
@@ -200,10 +227,43 @@ public class FileProjectManager implements ProjectManager {
     	return project;
     }
     
-    private void writeProjectFile(File directory, Project project) throws IOException {
+    private synchronized void writeProjectFile(File directory, Project project) throws IOException {
     	Object object = project.toObject();
-    	File outputFile = new File(directory, PROPERTIES_FILENAME);
-    	logger.info("Writing project file " + outputFile);
+    	File tmpFile = File.createTempFile("project-",".json", directory);
+
+    	if (tmpFile.exists()) {
+    		tmpFile.delete();
+    	}
+
+    	logger.info("Writing project file " + tmpFile);
+    	String output = JSONUtils.toJSON(object, true);
+    	
+    	FileWriter writer = new FileWriter(tmpFile);
+    	try {
+    		writer.write(output);
+    	} catch (IOException e) {
+    		if (writer != null) {
+    			writer.close();
+    		}
+    		
+    		throw e;
+    	}
+    	writer.close();
+
+    	File projectFile = new File(directory, PROPERTIES_FILENAME);
+    	File swapFile = new File(directory, PROPERTIES_FILENAME + "_old");
+    	
+    	projectFile.renameTo(swapFile);
+    	tmpFile.renameTo(projectFile);
+    	swapFile.delete();
+
+    }
+    
+    private void writeFlowFile(File directory, Flow flow) throws IOException {
+		Object object = flow.toObject();
+    	String filename = flow.getId() + FLOW_EXTENSION;
+    	File outputFile = new File(directory, filename);
+    	logger.info("Writing flow file " + outputFile);
     	String output = JSONUtils.toJSON(object, true);
     	
     	FileWriter writer = new FileWriter(outputFile);
diff --git a/src/java/azkaban/project/Project.java b/src/java/azkaban/project/Project.java
index 12a4934..15ea37f 100644
--- a/src/java/azkaban/project/Project.java
+++ b/src/java/azkaban/project/Project.java
@@ -15,8 +15,9 @@ public class Project {
 	private long createTimestamp;
 	private long lastModifiedTimestamp;
 	private String lastModifiedUser;
+	private String source;
 	private HashMap<String, Permission> userToPermission = new HashMap<String, Permission>();
-
+	
 	public Project(String name) {
 		this.name = name;
 	}
@@ -88,6 +89,10 @@ public class Project {
 		projectObject.put("createTimestamp", createTimestamp);
 		projectObject.put("lastModifiedTimestamp", lastModifiedTimestamp);
 		projectObject.put("lastModifiedUser", lastModifiedUser);
+		
+		if (source != null) {
+			projectObject.put("source", source);
+		}
 
 		ArrayList<Map<String, Object>> users = new ArrayList<Map<String, Object>>();
 		for (Map.Entry<String, Permission> entry : userToPermission.entrySet()) {
@@ -112,13 +117,19 @@ public class Project {
 				.get("createTimestamp"));
 		long lastModifiedTimestamp = coerceToLong(projectObject
 				.get("lastModifiedTimestamp"));
-
+		String source = (String)projectObject.get("source");
+		
 		Project project = new Project(name);
 		project.setDescription(description);
 		project.setCreateTimestamp(createTimestamp);
 		project.setLastModifiedTimestamp(lastModifiedTimestamp);
 		project.setLastModifiedUser(lastModifiedUser);
 
+		if (source != null) {
+			project.setSource(source);
+		}
+
+		
 		List<Map<String, Object>> users = (List<Map<String, Object>>) projectObject
 				.get("users");
 
@@ -208,4 +219,12 @@ public class Project {
 			return false;
 		return true;
 	}
+
+	public String getSource() {
+		return source;
+	}
+
+	public void setSource(String source) {
+		this.source = source;
+	}
 }
diff --git a/src/java/azkaban/utils/FlowUtils.java b/src/java/azkaban/utils/FlowUtils.java
index 7735ec5..35a7bce 100644
--- a/src/java/azkaban/utils/FlowUtils.java
+++ b/src/java/azkaban/utils/FlowUtils.java
@@ -21,37 +21,38 @@ public class FlowUtils {
 	private static final String DEPENDENCIES = "dependencies";
 	private static final String JOB_SUFFIX = ".job";
 	
-	public static void loadProject(File dir, Map<String, Flow> output, List<String> projectErrors) {
-		String base = dir.getAbsolutePath();
-		
+	public static void loadProject(File dir, Map<String, Flow> output, List<Props> propsList, List<String> projectErrors) {
 		// Load all the project and job files.
 		Map<String,Node> jobMap = new HashMap<String,Node>();
 		Set<String> duplicateJobs = new HashSet<String>();
 		Set<String> errors = new HashSet<String>();
-		loadProjectFromDir(base, dir, jobMap, duplicateJobs, errors);
+		loadProjectFromDir(dir.getPath(), dir, jobMap, propsList, duplicateJobs, errors);
 		
 		// Create edge dependency sets.
 		Map<String, Set<Edge>> dependencies = new HashMap<String, Set<Edge>>();
 		resolveDependencies(jobMap, duplicateJobs, dependencies, errors);
 
 		HashMap<String, Flow> flows = buildFlowsFromDependencies(jobMap, dependencies, errors);
+		output.putAll(flows);
 		projectErrors.addAll(errors);
 	}
 	
-	public static void toJSONStream(Flow flow) {
-		
-	}
-	
 	/**
 	 * Loads all the files, prop and job files. Props are assigned to the job nodes.
 	 */
-	private static void loadProjectFromDir(String baseDir, File dir, Map<String, Node> jobMap, Set<String> duplicateJobs, Set<String> errors) {
+	private static void loadProjectFromDir(String base, File dir, Map<String, Node> jobMap, List<Props> propsList, Set<String> duplicateJobs, Set<String> errors) {
+
 		// Load all property files
 		File[] propertyFiles = dir.listFiles(new SuffixFilter(PROPERTY_SUFFIX));
 		Props parent = null;
 		for (File file: propertyFiles) {
 			try {
 				parent = new Props(parent, file);
+				String relative = getRelativeFilePath(base, file.getPath());
+				parent.setSource(relative);
+				
+				System.out.println("Adding " + relative);
+				propsList.add(parent);
 			} catch (IOException e) {
 				errors.add("Error loading properties " + file.getName() + ":" + e.getMessage());
 			}
@@ -61,8 +62,7 @@ public class FlowUtils {
 		File[] jobFiles = dir.listFiles(new SuffixFilter(JOB_SUFFIX));
 		for (File file: jobFiles) {
 			try {
-				String jobName = getJobName(file, JOB_SUFFIX);
-
+				String jobName = getNameWithoutExtension(file);
 				if (!duplicateJobs.contains(jobName)) {
 					if (jobMap.containsKey(jobName)) {
 						errors.add("Duplicate job names found '" + jobName + "'.");
@@ -71,8 +71,11 @@ public class FlowUtils {
 					}
 					else {
 						Props prop = new Props(parent, file);
+						String relative = getRelativeFilePath(base, file.getPath());
+						prop.setSource(relative);
+						
 						Node node = new Node(jobName, prop);
-					
+
 						jobMap.put(jobName, node);
 					}
 				}
@@ -84,7 +87,7 @@ public class FlowUtils {
 		
 		File[] subDirs = dir.listFiles(DIR_FILTER);
 		for (File file: subDirs) {
-			loadProjectFromDir(baseDir, file, jobMap, duplicateJobs, errors);
+			loadProjectFromDir(base, file, jobMap, propsList, duplicateJobs, errors);
 		}
 		
 	}
@@ -189,9 +192,15 @@ public class FlowUtils {
 		visited.remove(node.getId());
 	}
 	
-	private static String getJobName(File file, String suffix) {
+	private static String getNameWithoutExtension(File file) {
 		String filename = file.getName();
-		return filename.substring(0, filename.length() - JOB_SUFFIX.length());
+		int index = filename.lastIndexOf('.');
+		
+		return index < 0 ? filename : filename.substring(0, index);
+	}
+	
+	private static String getRelativeFilePath(String basePath, String filePath) {
+		return filePath.substring(basePath.length() + 1);
 	}
 
 	private static class DirFilter implements FileFilter {
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 1ca35ce..027fe09 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -19,6 +19,7 @@ import javax.servlet.http.HttpServletResponse;
 
 import org.apache.commons.fileupload.FileItem;
 import org.apache.commons.fileupload.servlet.ServletFileUpload;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
 
@@ -145,7 +146,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
         FileItem item = (FileItem) multipart.get("file");
         String forceStr = (String) multipart.get("force");
         boolean force = forceStr == null ? false : Boolean.parseBoolean(forceStr);
-        
+        File projectDir = null;
         if (projectName == null || projectName.isEmpty()) {
         	setErrorMessageInCookie(resp, "No project name found.");
         }
@@ -154,7 +155,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
         }
         else {
 	    	try {
-	            File projectDir = extractFile(item);
+	            projectDir = extractFile(item);
 	            manager.uploadProject(projectName, projectDir, user, force);
 	            setSuccessMessageInCookie(resp, "Project Uploaded");
 	    	} 
@@ -163,6 +164,9 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 	    		setErrorMessageInCookie(resp, "Installation Failed.\n" + e.getMessage());
 	        }
 
+	    	if (projectDir != null && projectDir.exists() ) {
+	    		FileUtils.deleteDirectory(projectDir);
+	    	}
 	    	resp.sendRedirect(req.getRequestURI() + "?project=" + projectName);
         }
     }