azkaban-aplcache

Uploading of projects.

6/21/2012 8:30:57 PM

Details

diff --git a/src/java/azkaban/flow/Flow.java b/src/java/azkaban/flow/Flow.java
index e23edf4..05dc08c 100644
--- a/src/java/azkaban/flow/Flow.java
+++ b/src/java/azkaban/flow/Flow.java
@@ -61,7 +61,7 @@ public class Flow {
     	String target = edge.getTargetId();
 
     	if (edge instanceof ErrorEdge) {
-    		errors.add(edge);
+    		addError(edge);
     	}
 
     	Set<Edge> sourceSet = getEdgeSet(sourceEdges, source);
diff --git a/src/java/azkaban/flow/Node.java b/src/java/azkaban/flow/Node.java
index 833fc42..a8f1f29 100644
--- a/src/java/azkaban/flow/Node.java
+++ b/src/java/azkaban/flow/Node.java
@@ -14,6 +14,7 @@ public class Node {
     
     public Node(String id, Props props) {
         this.id = id;
+        this.props = props;
     }
 
     /**
diff --git a/src/java/azkaban/project/FileProjectManager.java b/src/java/azkaban/project/FileProjectManager.java
index 684c024..3d0fe13 100644
--- a/src/java/azkaban/project/FileProjectManager.java
+++ b/src/java/azkaban/project/FileProjectManager.java
@@ -5,14 +5,19 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.security.AccessControlException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.log4j.Logger;
 
+import azkaban.flow.ErrorEdge;
+import azkaban.flow.Flow;
 import azkaban.user.Permission;
 import azkaban.user.Permission.Type;
 import azkaban.user.User;
+import azkaban.utils.FlowUtils;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
 
@@ -110,7 +115,8 @@ public class FileProjectManager implements ProjectManager {
     	return null;
     }
     
-    public void uploadProject(String projectName, File dir, User uploader) throws ProjectManagerException {
+    public void uploadProject(String projectName, File dir, User uploader, boolean force) throws ProjectManagerException {
+		logger.info("Uploading files to " + projectName);
     	Project project = projects.get(projectName);
     	
     	if (project == null) {
@@ -120,15 +126,29 @@ public class FileProjectManager implements ProjectManager {
 			throw new AccessControlException("Permission denied. Do not have write access.");
 		}
     	
-    	// We synchronize on project so that we don't collide when uploading.
-    	synchronized (project) {
-    		logger.info("Uploading files to " + projectName);
-    		
-    		
-    		// Do this if it succeeds
-    		project.setLastModifiedTimestamp(System.currentTimeMillis());
-    		project.setLastModifiedUser(uploader.getUserId());
-    	}
+		Map<String, Flow> flows = new HashMap<String,Flow>();
+		List<String> errors = new ArrayList<String>();
+		FlowUtils.loadProject(dir, flows, errors);
+		
+		// We install only if the project is not forced install or has no errors
+		if (force || errors.isEmpty()) {
+	    	// We synchronize on project so that we don't collide when uploading.
+	    	synchronized (project) {
+	    		logger.info("Uploading files to " + projectName);
+	    		project.setLastModifiedTimestamp(System.currentTimeMillis());
+	    		project.setLastModifiedUser(uploader.getUserId());
+	    	}
+		}
+		else {
+			logger.info("Errors found loading project " + projectName);
+			StringBuffer bufferErrors = new StringBuffer();
+			for(String error : errors) {
+				bufferErrors.append(error);
+				bufferErrors.append("\n");
+			}
+			throw new ProjectManagerException(bufferErrors.toString());
+		}
+		
     }
     
     @Override
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index 107738b..fbc0d92 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -14,7 +14,7 @@ public interface ProjectManager {
     
     public Project getProject(String name, User user) throws AccessControlException;
     
-    public void uploadProject(String projectName, File projectDir, User uploader) throws ProjectManagerException;
+    public void uploadProject(String projectName, File projectDir, User uploader, boolean force) throws ProjectManagerException;
     
     public Project createProject(String projectName, String description, User creator) throws ProjectManagerException;
     
diff --git a/src/java/azkaban/project/ProjectManagerException.java b/src/java/azkaban/project/ProjectManagerException.java
index a7b3575..fe16a8c 100644
--- a/src/java/azkaban/project/ProjectManagerException.java
+++ b/src/java/azkaban/project/ProjectManagerException.java
@@ -2,9 +2,6 @@ package azkaban.project;
 
 public class ProjectManagerException extends Exception{
 	private static final long serialVersionUID = 1L;
-	public enum Type {
-		
-	}
 
 	public ProjectManagerException(String message) {
 		super(message);
diff --git a/src/java/azkaban/utils/FlowUtils.java b/src/java/azkaban/utils/FlowUtils.java
index 3723b14..7735ec5 100644
--- a/src/java/azkaban/utils/FlowUtils.java
+++ b/src/java/azkaban/utils/FlowUtils.java
@@ -21,27 +21,31 @@ public class FlowUtils {
 	private static final String DEPENDENCIES = "dependencies";
 	private static final String JOB_SUFFIX = ".job";
 	
-	public static Map<String, Flow> loadProject(File dir) {
+	public static void loadProject(File dir, Map<String, Flow> output, List<String> projectErrors) {
 		String base = dir.getAbsolutePath();
 		
 		// Load all the project and job files.
 		Map<String,Node> jobMap = new HashMap<String,Node>();
 		Set<String> duplicateJobs = new HashSet<String>();
-		List<String> errors = new ArrayList<String>();
+		Set<String> errors = new HashSet<String>();
 		loadProjectFromDir(base, dir, jobMap, duplicateJobs, errors);
 		
 		// Create edge dependency sets.
 		Map<String, Set<Edge>> dependencies = new HashMap<String, Set<Edge>>();
-		resolveDependencies(jobMap, duplicateJobs, dependencies);
+		resolveDependencies(jobMap, duplicateJobs, dependencies, errors);
 
-		HashMap<String, Flow> flows = buildFlowsFromDependencies(jobMap, dependencies);
-		return flows;
+		HashMap<String, Flow> flows = buildFlowsFromDependencies(jobMap, dependencies, errors);
+		projectErrors.addAll(errors);
+	}
+	
+	public static void toJSONStream(Flow flow) {
+		
 	}
 	
 	/**
 	 * Loads all the files, prop and job files. Props are assigned to the job nodes.
 	 */
-	private static void loadProjectFromDir(String baseDir, File dir, Map<String, Node> jobMap, Set<String> duplicateJobs, List<String> errors) {
+	private static void loadProjectFromDir(String baseDir, File dir, Map<String, Node> jobMap, Set<String> duplicateJobs, Set<String> errors) {
 		// Load all property files
 		File[] propertyFiles = dir.listFiles(new SuffixFilter(PROPERTY_SUFFIX));
 		Props parent = null;
@@ -61,6 +65,7 @@ public class FlowUtils {
 
 				if (!duplicateJobs.contains(jobName)) {
 					if (jobMap.containsKey(jobName)) {
+						errors.add("Duplicate job names found '" + jobName + "'.");
 						duplicateJobs.add(jobName);
 						jobMap.remove(jobName);
 					}
@@ -84,12 +89,12 @@ public class FlowUtils {
 		
 	}
 	
-	private static void resolveDependencies(Map<String, Node> jobMap, Set<String> duplicateJobs, Map<String, Set<Edge>> nodeDependencies) {
+	private static void resolveDependencies(Map<String, Node> jobMap, Set<String> duplicateJobs, Map<String, Set<Edge>> nodeDependencies, Set<String> errors) {
 		// Add all the in edges and out edges. Catch bad dependencies and self referrals. Also collect list of nodes who are parents.
 		for (Node node: jobMap.values()) {
 			List<String> dependencyList = node.getProps().getStringList(DEPENDENCIES, (List<String>)null);
 			
-			if (duplicateJobs != null) {
+			if (dependencyList != null) {
 				Set<Edge> dependencies = nodeDependencies.get(node.getId());
 				if (dependencies == null) {
 					dependencies = new HashSet<Edge>();
@@ -99,18 +104,22 @@ public class FlowUtils {
 							continue;
 						}
 						
+						dependencyName = dependencyName.trim();
 						Node dependencyNode = jobMap.get(dependencyName);
 						if (dependencyNode == null) {
 							if (duplicateJobs.contains(dependencyName)) {
 								dependencies.add(new ErrorEdge(dependencyName, node, "Ambiguous Dependency. Duplicates found."));
+								errors.add(node.getId() + " has ambiguous dependency " + dependencyName);
 							}
 							else {
 								dependencies.add(new ErrorEdge(dependencyName, node, "Dependency not found."));
+								errors.add(node.getId() + " cannot find dependency " + dependencyName);
 							}
 						}
 						else if (dependencyNode == node) {
 							// We have a self cycle
 							dependencies.add(new ErrorEdge(dependencyName, node, "Self cycle found."));
+							errors.add(node.getId() + " has a self cycle");
 						}
 						else {
 							dependencies.add(new Edge(dependencyNode, node));
@@ -126,7 +135,7 @@ public class FlowUtils {
 		
 	}
 	
-	private static HashMap<String, Flow> buildFlowsFromDependencies(Map<String, Node> nodes, Map<String, Set<Edge>> nodeDependencies) {
+	private static HashMap<String, Flow> buildFlowsFromDependencies(Map<String, Node> nodes, Map<String, Set<Edge>> nodeDependencies, Set<String> errors) {
 		// Find all root nodes by finding ones without dependents.
 		HashSet<String> nonRootNodes = new HashSet<String>();
 		for (Set<Edge> edges: nodeDependencies.values()) {
@@ -144,7 +153,7 @@ public class FlowUtils {
 			if (!nonRootNodes.contains(base)) {
 				Flow flow = new Flow(base.getId());
 				flow.addBaseNode(base);
-				constructFlow(flow, base, nodes, nodeDependencies, visitedNodes);
+				constructFlow(flow, base, nodes, nodeDependencies, visitedNodes, errors);
 				flows.put(base.getId(), flow);
 			}
 		}
@@ -152,7 +161,7 @@ public class FlowUtils {
 		return flows;
 	}
 	
-	private static void constructFlow(Flow flow, Node node, Map<String, Node> nodes, Map<String, Set<Edge>> nodeDependencies, Set<String> visited) {
+	private static void constructFlow(Flow flow, Node node, Map<String, Node> nodes, Map<String, Set<Edge>> nodeDependencies, Set<String> visited, Set<String> errors) {
 		visited.add(node.getId());
 		flow.addNode(node);
 		Set<Edge> dependencies = nodeDependencies.get(node.getId());
@@ -165,13 +174,14 @@ public class FlowUtils {
 				else if (visited.contains(edge.getSourceId())){
 					// We have a cycle. We set it as an error edge
 					edge = new ErrorEdge(edge.getSourceId(), node, "Cyclical dependencies found.");
+					errors.add("Cyclical dependency found at " + edge.getId());
 					flow.addEdge(edge);
 				}
 				else {
 					// This should not be null
 					flow.addEdge(edge);
 					Node fromNode = edge.getSource();
-					constructFlow(flow, fromNode, nodes, nodeDependencies, visited);					
+					constructFlow(flow, fromNode, nodes, nodeDependencies, visited, errors);					
 				}
 			}
 		}
diff --git a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
index 5ade9ac..c57fd8b 100644
--- a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
@@ -241,11 +241,11 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
         page.add("context", req.getContextPath());
         
         String errorMsg = getErrorMessageFromCookie(req);
-        page.add("error_message", errorMsg == null || errorMsg.isEmpty()? "null": "\"" + errorMsg + "\"");
+        page.add("error_message", errorMsg == null || errorMsg.isEmpty()? "null": errorMsg);
         setErrorMessageInCookie(resp, null);
         
         String successMsg = getSuccessMessageFromCookie(req);
-        page.add("success_message", successMsg == null || successMsg.isEmpty()? "null": "\"" + successMsg + "\"");
+        page.add("success_message", successMsg == null || successMsg.isEmpty()? "null":  successMsg);
         setSuccessMessageInCookie(resp, null);
 
         
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 38bd53c..1ca35ce 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -143,6 +143,9 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
     	User user = session.getUser();
     	String projectName = (String) multipart.get("project");
         FileItem item = (FileItem) multipart.get("file");
+        String forceStr = (String) multipart.get("force");
+        boolean force = forceStr == null ? false : Boolean.parseBoolean(forceStr);
+        
         if (projectName == null || projectName.isEmpty()) {
         	setErrorMessageInCookie(resp, "No project name found.");
         }
@@ -151,11 +154,13 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
         }
         else {
 	    	try {
-	            File jobDir = extractFile(item);
-	            
-	        } catch (Exception e) {
+	            File projectDir = extractFile(item);
+	            manager.uploadProject(projectName, projectDir, user, force);
+	            setSuccessMessageInCookie(resp, "Project Uploaded");
+	    	} 
+	    	catch (Exception e) {
 	            logger.info("Installation Failed.", e);
-	            setErrorMessageInCookie(resp, "Installation Failed.");
+	    		setErrorMessageInCookie(resp, "Installation Failed.\n" + e.getMessage());
 	        }
 
 	    	resp.sendRedirect(req.getRequestURI() + "?project=" + projectName);
diff --git a/src/java/azkaban/webapp/servlet/velocity/projectmanager.vm b/src/java/azkaban/webapp/servlet/velocity/projectmanager.vm
index a88b3e2..e4fea33 100644
--- a/src/java/azkaban/webapp/servlet/velocity/projectmanager.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/projectmanager.vm
@@ -9,12 +9,13 @@
     <script type="text/javascript" src="${context}/js/backbone-0.5.3-min.js"></script>
     <script type="text/javascript" src="${context}/js/jquery.simplemodal.js"></script>
     <script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
+    <script type="text/javascript" src="${context}/js/azkaban.project.view.js"></script>
     <script type="text/javascript">
       var contextURL = "${context}";
       var currentTime = ${currentTime};
       var timezone = "${timezone}";
-      var errorMessage = ${error_message};
-      var successMessage = ${success_message};
+      var errorMessage = "${error_message}";
+      var successMessage = "${success_message}";
     </script>
   </head>
   <body>
@@ -26,6 +27,11 @@
 	  #if($errorMsg)
     	  <div class="box-error-message">$errorMsg</div>
       #else
+      	  #if($error_message != "null")
+    	  	<div class="box-error-message">$error_message</div>
+    	  #elseif($success_message != "null")
+    	  	<div class="box-success-message">$success_message</div>
+    	  #end
 	      <div id="all-jobs-content">
 	        <div class="section-hd">
 	          <h2>Project <span>$project.name</span></h2>
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 11ba190..d694eb6 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -634,6 +634,13 @@ tr:hover td {
   font-weight: bold;
 }
 
+.box-success-message {
+  background-color: #4e911e;
+  margin: 0px 2px;
+  padding: 2px 12px;
+  color: white;
+}
+
 .box-error-message {
   background-color: #C00000;
   margin: 0px 2px;