azkaban-aplcache
Changes
src/java/azkaban/flow/Flow.java 94(+94 -0)
src/java/azkaban/flow/Node.java 1(+1 -0)
src/java/azkaban/project/Project.java 23(+21 -2)
src/java/azkaban/utils/FlowUtils.java 39(+24 -15)
Details
src/java/azkaban/flow/Flow.java 94(+94 -0)
diff --git a/src/java/azkaban/flow/Flow.java b/src/java/azkaban/flow/Flow.java
index 05dc08c..ab104c0 100644
--- a/src/java/azkaban/flow/Flow.java
+++ b/src/java/azkaban/flow/Flow.java
@@ -5,8 +5,11 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import azkaban.utils.Props;
+
public class Flow {
public enum State {
READY, RUNNING, RUNNING_WITH_FAILURE, FAILED, SUCCEEDED
@@ -14,6 +17,7 @@ public class Flow {
private final String id;
private ArrayList<Node> baseNodes = new ArrayList<Node>();
private HashMap<String, Node> nodes = new HashMap<String, Node>();
+ private HashMap<String, Edge> edges = new HashMap<String, Edge>();
private HashMap<String, Set<Edge>> sourceEdges = new HashMap<String, Set<Edge>>();
private HashMap<String, Set<Edge>> targetEdges = new HashMap<String, Set<Edge>>();
private ArrayList<Object> errors;
@@ -56,6 +60,14 @@ public class Flow {
return errors != null && !errors.isEmpty();
}
+ public Collection<Node> getNodes() {
+ return nodes.values();
+ }
+
+ public Collection<Edge> getEdges() {
+ return edges.values();
+ }
+
public void addEdge(Edge edge) {
String source = edge.getSourceId();
String target = edge.getTargetId();
@@ -70,6 +82,7 @@ public class Flow {
Set<Edge> targetSet = getEdgeSet(targetEdges, target);
targetSet.add(edge);
+ edges.put(edge.getId(), edge);
}
private Set<Edge> getEdgeSet(HashMap<String, Set<Edge>> map, String id) {
@@ -81,4 +94,85 @@ public class Flow {
return edges;
}
+
+ public Map<String,Object> toObject() {
+ HashMap<String, Object> flowObj = new HashMap<String, Object>();
+ flowObj.put("type", "flow");
+ flowObj.put("id", getId());
+ flowObj.put("properties", objectizeProperties());
+ flowObj.put("nodes", objectizeNodes());
+ flowObj.put("edges", objectizeEdges());
+
+ return flowObj;
+ }
+
+ private List<Map<String,Object>> objectizeNodes() {
+ ArrayList<Map<String,Object>> result = new ArrayList<Map<String,Object>>();
+ for (Node node : getNodes()) {
+ HashMap<String, Object> nodeObj = new HashMap<String, Object>();
+ nodeObj.put("id", node.getId());
+ nodeObj.put("props.source", node.getProps().getSource());
+ Props parentProps = node.getProps().getParent();
+
+ if (parentProps != null) {
+ nodeObj.put("inherited.source", parentProps.getSource());
+ }
+ result.add(nodeObj);
+ }
+
+ return result;
+ }
+
+ private List<Map<String,Object>> objectizeEdges() {
+ ArrayList<Map<String,Object>> result = new ArrayList<Map<String,Object>>();
+ for (Edge edge: getEdges()) {
+ HashMap<String, Object> edgeObj = new HashMap<String, Object>();
+ edgeObj.put("id", edge.getId());
+ edgeObj.put("source", edge.getSourceId());
+ edgeObj.put("target", edge.getTargetId());
+ if (edge instanceof ErrorEdge) {
+ ErrorEdge errorEdge = (ErrorEdge)edge;
+ edgeObj.put("error", errorEdge.getError());
+ }
+ result.add(edgeObj);
+ }
+
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private List<Map<String,Object>> objectizeProperties() {
+ ArrayList<Map<String,Object>> result = new ArrayList<Map<String,Object>>();
+
+ HashMap<String, Object> properties = new HashMap<String, Object>();
+ for (Node node: getNodes()) {
+ Props props = node.getProps().getParent();
+ if (props != null) {
+ traverseAndObjectizeProperties(properties, props);
+ }
+ }
+
+ for (Object propMap : properties.values()) {
+ result.add((Map<String,Object>)propMap);
+ }
+
+ return result;
+ }
+
+ private void traverseAndObjectizeProperties(HashMap<String, Object> properties, Props props) {
+ if (props.getSource() == null || properties.containsKey(props.getSource())) {
+ return;
+ }
+
+ HashMap<String, Object> propObj = new HashMap<String,Object>();
+ propObj.put("source", props.getSource());
+ properties.put(props.getSource(), propObj);
+
+ Props parent = props.getParent();
+ if (parent != null) {
+ propObj.put("inherits", parent.getSource());
+
+ traverseAndObjectizeProperties(properties, parent);
+ }
+ }
}
\ No newline at end of file
src/java/azkaban/flow/Node.java 1(+1 -0)
diff --git a/src/java/azkaban/flow/Node.java b/src/java/azkaban/flow/Node.java
index a8f1f29..68012f6 100644
--- a/src/java/azkaban/flow/Node.java
+++ b/src/java/azkaban/flow/Node.java
@@ -10,6 +10,7 @@ public class Node {
private final String id;
private State state = State.WAITING;
+
private Props props;
public Node(String id, Props props) {
diff --git a/src/java/azkaban/project/FileProjectManager.java b/src/java/azkaban/project/FileProjectManager.java
index 3d0fe13..0ad73bf 100644
--- a/src/java/azkaban/project/FileProjectManager.java
+++ b/src/java/azkaban/project/FileProjectManager.java
@@ -11,6 +11,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
import azkaban.flow.ErrorEdge;
import azkaban.flow.Flow;
@@ -28,7 +30,10 @@ import azkaban.utils.Props;
*/
public class FileProjectManager implements ProjectManager {
public static final String DIRECTORY_PARAM = "file.project.loader.path";
+ private static final DateTimeFormatter FILE_DATE_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd-HH:mm.ss.SSS");
private static final String PROPERTIES_FILENAME = "project.json";
+ private static final String PROJECT_DIRECTORY = "src";
+ private static final String FLOW_EXTENSION = ".flow";
private static final Logger logger = Logger.getLogger(FileProjectManager.class);
private ConcurrentHashMap<String, Project> projects = new ConcurrentHashMap<String, Project>();
@@ -128,13 +133,34 @@ public class FileProjectManager implements ProjectManager {
Map<String, Flow> flows = new HashMap<String,Flow>();
List<String> errors = new ArrayList<String>();
- FlowUtils.loadProject(dir, flows, errors);
+ List<Props> propsList = new ArrayList<Props>();
+ FlowUtils.loadProject(dir, flows, propsList, errors);
+
+ File projectPath = new File(projectDirectory, projectName);
+ File installDir = new File(projectPath, FILE_DATE_FORMAT.print(System.currentTimeMillis()));
+ if (!installDir.mkdir()) {
+ throw new ProjectManagerException("Cannot create directory in " + projectDirectory);
+ }
+
+ for (Flow flow: flows.values()) {
+ try {
+ writeFlowFile(installDir, flow);
+ } catch (IOException e) {
+ throw new ProjectManagerException(
+ "Project directory " + projectName +
+ " cannot be created in " + projectDirectory, e);
+ }
+ }
+
+ File destDirectory = new File(installDir, PROJECT_DIRECTORY);
+ dir.renameTo(destDirectory);
// We install only if the project is not forced install or has no errors
if (force || errors.isEmpty()) {
// We synchronize on project so that we don't collide when uploading.
synchronized (project) {
logger.info("Uploading files to " + projectName);
+ project.setSource(projectDirectory.getName());
project.setLastModifiedTimestamp(System.currentTimeMillis());
project.setLastModifiedUser(uploader.getUserId());
}
@@ -146,6 +172,7 @@ public class FileProjectManager implements ProjectManager {
bufferErrors.append(error);
bufferErrors.append("\n");
}
+
throw new ProjectManagerException(bufferErrors.toString());
}
@@ -200,10 +227,43 @@ public class FileProjectManager implements ProjectManager {
return project;
}
- private void writeProjectFile(File directory, Project project) throws IOException {
+ private synchronized void writeProjectFile(File directory, Project project) throws IOException {
Object object = project.toObject();
- File outputFile = new File(directory, PROPERTIES_FILENAME);
- logger.info("Writing project file " + outputFile);
+ File tmpFile = File.createTempFile("project-",".json", directory);
+
+ if (tmpFile.exists()) {
+ tmpFile.delete();
+ }
+
+ logger.info("Writing project file " + tmpFile);
+ String output = JSONUtils.toJSON(object, true);
+
+ FileWriter writer = new FileWriter(tmpFile);
+ try {
+ writer.write(output);
+ } catch (IOException e) {
+ if (writer != null) {
+ writer.close();
+ }
+
+ throw e;
+ }
+ writer.close();
+
+ File projectFile = new File(directory, PROPERTIES_FILENAME);
+ File swapFile = new File(directory, PROPERTIES_FILENAME + "_old");
+
+ projectFile.renameTo(swapFile);
+ tmpFile.renameTo(projectFile);
+ swapFile.delete();
+
+ }
+
+ private void writeFlowFile(File directory, Flow flow) throws IOException {
+ Object object = flow.toObject();
+ String filename = flow.getId() + FLOW_EXTENSION;
+ File outputFile = new File(directory, filename);
+ logger.info("Writing flow file " + outputFile);
String output = JSONUtils.toJSON(object, true);
FileWriter writer = new FileWriter(outputFile);
src/java/azkaban/project/Project.java 23(+21 -2)
diff --git a/src/java/azkaban/project/Project.java b/src/java/azkaban/project/Project.java
index 12a4934..15ea37f 100644
--- a/src/java/azkaban/project/Project.java
+++ b/src/java/azkaban/project/Project.java
@@ -15,8 +15,9 @@ public class Project {
private long createTimestamp;
private long lastModifiedTimestamp;
private String lastModifiedUser;
+ private String source;
private HashMap<String, Permission> userToPermission = new HashMap<String, Permission>();
-
+
public Project(String name) {
this.name = name;
}
@@ -88,6 +89,10 @@ public class Project {
projectObject.put("createTimestamp", createTimestamp);
projectObject.put("lastModifiedTimestamp", lastModifiedTimestamp);
projectObject.put("lastModifiedUser", lastModifiedUser);
+
+ if (source != null) {
+ projectObject.put("source", source);
+ }
ArrayList<Map<String, Object>> users = new ArrayList<Map<String, Object>>();
for (Map.Entry<String, Permission> entry : userToPermission.entrySet()) {
@@ -112,13 +117,19 @@ public class Project {
.get("createTimestamp"));
long lastModifiedTimestamp = coerceToLong(projectObject
.get("lastModifiedTimestamp"));
-
+ String source = (String)projectObject.get("source");
+
Project project = new Project(name);
project.setDescription(description);
project.setCreateTimestamp(createTimestamp);
project.setLastModifiedTimestamp(lastModifiedTimestamp);
project.setLastModifiedUser(lastModifiedUser);
+ if (source != null) {
+ project.setSource(source);
+ }
+
+
List<Map<String, Object>> users = (List<Map<String, Object>>) projectObject
.get("users");
@@ -208,4 +219,12 @@ public class Project {
return false;
return true;
}
+
+ public String getSource() {
+ return source;
+ }
+
+ public void setSource(String source) {
+ this.source = source;
+ }
}
src/java/azkaban/utils/FlowUtils.java 39(+24 -15)
diff --git a/src/java/azkaban/utils/FlowUtils.java b/src/java/azkaban/utils/FlowUtils.java
index 7735ec5..35a7bce 100644
--- a/src/java/azkaban/utils/FlowUtils.java
+++ b/src/java/azkaban/utils/FlowUtils.java
@@ -21,37 +21,38 @@ public class FlowUtils {
private static final String DEPENDENCIES = "dependencies";
private static final String JOB_SUFFIX = ".job";
- public static void loadProject(File dir, Map<String, Flow> output, List<String> projectErrors) {
- String base = dir.getAbsolutePath();
-
+ public static void loadProject(File dir, Map<String, Flow> output, List<Props> propsList, List<String> projectErrors) {
// Load all the project and job files.
Map<String,Node> jobMap = new HashMap<String,Node>();
Set<String> duplicateJobs = new HashSet<String>();
Set<String> errors = new HashSet<String>();
- loadProjectFromDir(base, dir, jobMap, duplicateJobs, errors);
+ loadProjectFromDir(dir.getPath(), dir, jobMap, propsList, duplicateJobs, errors);
// Create edge dependency sets.
Map<String, Set<Edge>> dependencies = new HashMap<String, Set<Edge>>();
resolveDependencies(jobMap, duplicateJobs, dependencies, errors);
HashMap<String, Flow> flows = buildFlowsFromDependencies(jobMap, dependencies, errors);
+ output.putAll(flows);
projectErrors.addAll(errors);
}
- public static void toJSONStream(Flow flow) {
-
- }
-
/**
* Loads all the files, prop and job files. Props are assigned to the job nodes.
*/
- private static void loadProjectFromDir(String baseDir, File dir, Map<String, Node> jobMap, Set<String> duplicateJobs, Set<String> errors) {
+ private static void loadProjectFromDir(String base, File dir, Map<String, Node> jobMap, List<Props> propsList, Set<String> duplicateJobs, Set<String> errors) {
+
// Load all property files
File[] propertyFiles = dir.listFiles(new SuffixFilter(PROPERTY_SUFFIX));
Props parent = null;
for (File file: propertyFiles) {
try {
parent = new Props(parent, file);
+ String relative = getRelativeFilePath(base, file.getPath());
+ parent.setSource(relative);
+
+ System.out.println("Adding " + relative);
+ propsList.add(parent);
} catch (IOException e) {
errors.add("Error loading properties " + file.getName() + ":" + e.getMessage());
}
@@ -61,8 +62,7 @@ public class FlowUtils {
File[] jobFiles = dir.listFiles(new SuffixFilter(JOB_SUFFIX));
for (File file: jobFiles) {
try {
- String jobName = getJobName(file, JOB_SUFFIX);
-
+ String jobName = getNameWithoutExtension(file);
if (!duplicateJobs.contains(jobName)) {
if (jobMap.containsKey(jobName)) {
errors.add("Duplicate job names found '" + jobName + "'.");
@@ -71,8 +71,11 @@ public class FlowUtils {
}
else {
Props prop = new Props(parent, file);
+ String relative = getRelativeFilePath(base, file.getPath());
+ prop.setSource(relative);
+
Node node = new Node(jobName, prop);
-
+
jobMap.put(jobName, node);
}
}
@@ -84,7 +87,7 @@ public class FlowUtils {
File[] subDirs = dir.listFiles(DIR_FILTER);
for (File file: subDirs) {
- loadProjectFromDir(baseDir, file, jobMap, duplicateJobs, errors);
+ loadProjectFromDir(base, file, jobMap, propsList, duplicateJobs, errors);
}
}
@@ -189,9 +192,15 @@ public class FlowUtils {
visited.remove(node.getId());
}
- private static String getJobName(File file, String suffix) {
+ private static String getNameWithoutExtension(File file) {
String filename = file.getName();
- return filename.substring(0, filename.length() - JOB_SUFFIX.length());
+ int index = filename.lastIndexOf('.');
+
+ return index < 0 ? filename : filename.substring(0, index);
+ }
+
+ private static String getRelativeFilePath(String basePath, String filePath) {
+ return filePath.substring(basePath.length() + 1);
}
private static class DirFilter implements FileFilter {
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 1ca35ce..027fe09 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -19,6 +19,7 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.commons.fileupload.FileItem;
import org.apache.commons.fileupload.servlet.ServletFileUpload;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
@@ -145,7 +146,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
FileItem item = (FileItem) multipart.get("file");
String forceStr = (String) multipart.get("force");
boolean force = forceStr == null ? false : Boolean.parseBoolean(forceStr);
-
+ File projectDir = null;
if (projectName == null || projectName.isEmpty()) {
setErrorMessageInCookie(resp, "No project name found.");
}
@@ -154,7 +155,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
}
else {
try {
- File projectDir = extractFile(item);
+ projectDir = extractFile(item);
manager.uploadProject(projectName, projectDir, user, force);
setSuccessMessageInCookie(resp, "Project Uploaded");
}
@@ -163,6 +164,9 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
setErrorMessageInCookie(resp, "Installation Failed.\n" + e.getMessage());
}
+ if (projectDir != null && projectDir.exists() ) {
+ FileUtils.deleteDirectory(projectDir);
+ }
resp.sendRedirect(req.getRequestURI() + "?project=" + projectName);
}
}