azkaban-aplcache
Changes
src/java/azkaban/executor/ExecutableFlow.java 115(+111 -4)
src/java/azkaban/flow/Flow.java 4(+4 -0)
src/java/azkaban/flow/Node.java 22(+0 -22)
src/java/azkaban/project/FileProjectManager.java 275(+166 -109)
src/java/azkaban/webapp/AzkabanWebServer.java 734(+374 -360)
src/web/css/azkaban.css 23(+22 -1)
src/web/js/azkaban.flow.view.js 93(+91 -2)
src/web/js/azkaban.main.view.js 2(+1 -1)
src/web/js/azkaban.project.view.js 4(+2 -2)
Details
diff --git a/src/java/azkaban/executor/AzkabanExecutorServer.java b/src/java/azkaban/executor/AzkabanExecutorServer.java
index f781e97..902d6c8 100644
--- a/src/java/azkaban/executor/AzkabanExecutorServer.java
+++ b/src/java/azkaban/executor/AzkabanExecutorServer.java
@@ -19,7 +19,6 @@ package azkaban.executor;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.TimeZone;
src/java/azkaban/executor/ExecutableFlow.java 115(+111 -4)
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 580d95d..2898066 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -1,20 +1,127 @@
package azkaban.executor;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import azkaban.flow.Edge;
import azkaban.flow.Flow;
+import azkaban.flow.FlowProps;
+import azkaban.flow.Node;
import azkaban.utils.Props;
public class ExecutableFlow {
- private Flow flow;
+ private String executionId;
+ private String flowId;
private HashMap<String, Props> sourceProps = new HashMap<String, Props>();
+ private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
+ private HashMap<String, ExecutableNode> executableNodes;
+ private ArrayList<String> startNodes = new ArrayList<String>();
- public ExecutableFlow(Flow flow, HashMap<String,Props> sourceProps) {
- this.flow = flow;
- this.sourceProps = sourceProps;
+ public enum Status {
+ FAILED, SUCCEEDED, RUNNING, WAITING, IGNORED, READY
+ }
+
+ private ExecutableFlow() {
+
}
public void run() {
}
+
+ public void setProps(String source, Props props) {
+ sourceProps.put(source, props);
+ }
+
+ public void setStatus(String nodeId, Status status) {
+ ExecutableNode exNode = executableNodes.get(nodeId);
+ exNode.setStatus(status);
+ }
+
+ public static ExecutableFlow createExecutableFlow(Flow flow, HashMap<String, Props> sourceProps) {
+ ExecutableFlow exflow = new ExecutableFlow();
+ exflow.flowId = flow.getId();
+
+ // We make a copy so that it's effectively immutable
+ exflow.sourceProps = new HashMap<String, Props>();
+ exflow.sourceProps.putAll(sourceProps);
+
+ HashMap<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+
+ for (Node node: flow.getNodes()) {
+ String id = node.getId();
+ ExecutableNode exNode = new ExecutableNode(node);
+ nodeMap.put(id, exNode);
+ }
+
+ for (Edge edge: flow.getEdges()) {
+ ExecutableNode sourceNode = nodeMap.get(edge.getSourceId());
+ ExecutableNode targetNode = nodeMap.get(edge.getTargetId());
+
+ sourceNode.addOutNode(edge.getTargetId());
+ targetNode.addInNode(edge.getSourceId());
+ }
+
+ for (ExecutableNode node : nodeMap.values()) {
+ if (node.getInNodes().size()==0) {
+ exflow.startNodes.add(node.id);
+ }
+ }
+
+ exflow.executableNodes = nodeMap;
+ return exflow;
+ }
+
+ public String getExecutionId() {
+ return executionId;
+ }
+
+ public void setExecutionId(String executionId) {
+ this.executionId = executionId;
+ }
+
+ private static class ExecutableNode {
+ private String id;
+ private String jobPropsSource;
+ private String inheritPropsSource;
+ private Status status;
+
+ private Set<String> inNodes = new HashSet<String>();
+ private Set<String> outNodes = new HashSet<String>();
+
+ private ExecutableNode(Node node) {
+ id = node.getId();
+ jobPropsSource = node.getJobSource();
+ inheritPropsSource = node.getPropsSource();
+ status = Status.READY;
+ }
+
+ public void addInNode(String exNode) {
+ inNodes.add(exNode);
+ }
+
+ public void addOutNode(String exNode) {
+ outNodes.add(exNode);
+ }
+
+ public Set<String> getOutNodes() {
+ return outNodes;
+ }
+
+ public Set<String> getInNodes() {
+ return inNodes;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+ public void setStatus(Status status) {
+ this.status = status;
+ }
+ }
}
diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
new file mode 100644
index 0000000..5c6476a
--- /dev/null
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -0,0 +1,9 @@
+package azkaban.executor;
+
+public interface ExecutorLoader {
+ public String getUniqueExecutionId();
+
+ public void commitExecutableFlow(ExecutableFlow exflow);
+
+ public ExecutableFlow loadExecutableFlow(String executionId);
+}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index a099c60..f739ef1 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -1,19 +1,29 @@
package azkaban.executor;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.log4j.Logger;
+
import azkaban.utils.Props;
public class ExecutorManager {
+ private static Logger logger = Logger.getLogger(ExecutorManager.class);
+ private AtomicLong counter = new AtomicLong();
private String token;
+ private HashMap<String, ExecutableFlow> runningFlows = new HashMap<String, ExecutableFlow>();
+
public ExecutorManager(Props props) {
token = props.getString("executor.shared.token", "");
+ counter.set(0);
}
- public void executeJob() {
+ public void executeFlow(ExecutableFlow flow) {
}
- private class ExecutorThread extends Thread {
+ private class ExecutingFlow implements Runnable {
public void run() {
}
diff --git a/src/java/azkaban/executor/ExecutorServlet.java b/src/java/azkaban/executor/ExecutorServlet.java
index 3d57999..dbc5c59 100644
--- a/src/java/azkaban/executor/ExecutorServlet.java
+++ b/src/java/azkaban/executor/ExecutorServlet.java
@@ -9,6 +9,10 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.log4j.Logger;
public class ExecutorServlet extends HttpServlet {
+ public enum State {
+ FAILED, SUCCEEDED, RUNNING, WAITING, IGNORED, READY
+ }
+
private static final Logger logger = Logger.getLogger(ExecutorServlet.class.getName());
private String sharedToken;
diff --git a/src/java/azkaban/executor/FileExecutorLoader.java b/src/java/azkaban/executor/FileExecutorLoader.java
new file mode 100644
index 0000000..b93980c
--- /dev/null
+++ b/src/java/azkaban/executor/FileExecutorLoader.java
@@ -0,0 +1,18 @@
+package azkaban.executor;
+
+public class FileExecutorLoader implements ExecutorLoader {
+ @Override
+ public String getUniqueExecutionId() {
+ return "";
+ }
+
+ @Override
+ public void commitExecutableFlow(ExecutableFlow flow) {
+
+ }
+
+ @Override
+ public ExecutableFlow loadExecutableFlow(String executionId) {
+ return null;
+ }
+}
src/java/azkaban/flow/Flow.java 4(+4 -0)
diff --git a/src/java/azkaban/flow/Flow.java b/src/java/azkaban/flow/Flow.java
index 86cd159..e35eee2 100644
--- a/src/java/azkaban/flow/Flow.java
+++ b/src/java/azkaban/flow/Flow.java
@@ -304,4 +304,8 @@ public class Flow {
public FlowProps getFlowProps(String propSource) {
return flowProps.get(propSource);
}
+
+ public Map<String, FlowProps> getAllFlowProps() {
+ return flowProps;
+ }
}
\ No newline at end of file
src/java/azkaban/flow/Node.java 22(+0 -22)
diff --git a/src/java/azkaban/flow/Node.java b/src/java/azkaban/flow/Node.java
index 4775757..2192468 100644
--- a/src/java/azkaban/flow/Node.java
+++ b/src/java/azkaban/flow/Node.java
@@ -7,14 +7,10 @@ import java.util.Map;
import azkaban.utils.Utils;
public class Node {
- public enum State {
- FAILED, SUCCEEDED, RUNNING, WAITING, IGNORED
- }
private final String id;
private String jobSource;
private String propsSource;
- private State state = State.WAITING;
private Point2D position = null;
private int level;
@@ -33,16 +29,11 @@ public class Node {
this.id = clone.id;
this.propsSource = clone.propsSource;
this.jobSource = clone.jobSource;
- this.state = clone.state;
}
public String getId() {
return id;
}
-
- public State getState() {
- return state;
- }
public String getType() {
return type;
@@ -51,10 +42,6 @@ public class Node {
public void setType(String type) {
this.type = type;
}
-
- public void setState(State state) {
- this.state = state;
- }
public Point2D getPosition() {
return position;
@@ -118,14 +105,6 @@ public class Node {
if (expectedRuntime != null) {
node.setExpectedRuntimeSec(expectedRuntime);
}
-
- String stateStr = (String)mapObj.get("status");
- if (stateStr != null) {
- State state = State.valueOf(stateStr);
- if (state != null) {
- node.setState(state);
- }
- }
Map<String,Object> layoutInfo = (Map<String,Object>)mapObj.get("layout");
if (layoutInfo != null) {
@@ -160,7 +139,6 @@ public class Node {
objMap.put("prop.source", propsSource);
objMap.put("job.type", type);
objMap.put("expectedRuntime", expectedRunTimeSec);
- objMap.put("state", state.toString());
HashMap<String, Object> layoutInfo = new HashMap<String, Object>();
if (position != null) {
src/java/azkaban/project/FileProjectManager.java 275(+166 -109)
diff --git a/src/java/azkaban/project/FileProjectManager.java b/src/java/azkaban/project/FileProjectManager.java
index 6bb7074..94d5b6b 100644
--- a/src/java/azkaban/project/FileProjectManager.java
+++ b/src/java/azkaban/project/FileProjectManager.java
@@ -6,6 +6,7 @@ import java.io.FileWriter;
import java.io.IOException;
import java.security.AccessControlException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -22,6 +23,8 @@ import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import azkaban.flow.Flow;
+import azkaban.flow.FlowProps;
+import azkaban.flow.Node;
import azkaban.user.Permission;
import azkaban.user.Permission.Type;
import azkaban.user.User;
@@ -30,9 +33,9 @@ import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
/**
- * A project loader that stores everything on local file system.
- * The following global parameters should be set -
- * file.project.loader.path - The project install path where projects will be loaded installed to.
+ * A project loader that stores everything on local file system. The following
+ * global parameters should be set - file.project.loader.path - The project
+ * install path where projects will be loaded installed to.
*/
public class FileProjectManager implements ProjectManager {
public static final String DIRECTORY_PARAM = "file.project.loader.path";
@@ -42,10 +45,11 @@ public class FileProjectManager implements ProjectManager {
private static final String FLOW_EXTENSION = ".flow";
private static final Logger logger = Logger.getLogger(FileProjectManager.class);
private static final int IDLE_SECONDS = 120;
+
private ConcurrentHashMap<String, Project> projects = new ConcurrentHashMap<String, Project>();
private CacheManager manager = CacheManager.create();
private Cache sourceCache;
-
+
private File projectDirectory;
public FileProjectManager(Props props) {
@@ -55,32 +59,33 @@ public class FileProjectManager implements ProjectManager {
}
private void setupCache() {
- CacheConfiguration cacheConfig =
- new CacheConfiguration("propsCache", 2000)
- .memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU)
- .overflowToDisk(false)
- .eternal(false)
- .timeToIdleSeconds(IDLE_SECONDS)
- .diskPersistent(false)
- .diskExpiryThreadIntervalSeconds(0);
+ CacheConfiguration cacheConfig = new CacheConfiguration("propsCache",2000)
+ .memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU)
+ .overflowToDisk(false)
+ .eternal(false)
+ .timeToIdleSeconds(IDLE_SECONDS)
+ .diskPersistent(false)
+ .diskExpiryThreadIntervalSeconds(0);
sourceCache = new Cache(cacheConfig);
manager.addCache(sourceCache);
}
-
+
private void setupDirectories(Props props) {
String projectDir = props.getString(DIRECTORY_PARAM);
logger.info("Using directory " + projectDir + " as the project directory.");
projectDirectory = new File(projectDir);
+
if (!projectDirectory.exists()) {
logger.info("Directory " + projectDir + " doesn't exist. Creating.");
if (projectDirectory.mkdirs()) {
logger.info("Directory creation was successful.");
- }
+ }
else {
- throw new RuntimeException("FileProjectLoader cannot create directory " + projectDirectory);
+ throw new RuntimeException(
+ "FileProjectLoader cannot create directory " + projectDirectory);
}
- }
+ }
else if (projectDirectory.isFile()) {
throw new RuntimeException("FileProjectManager directory " + projectDirectory + " is really a file.");
}
@@ -88,67 +93,77 @@ public class FileProjectManager implements ProjectManager {
private void loadAllProjects() {
File[] directories = projectDirectory.listFiles();
-
- for (File dir: directories) {
+
+ for (File dir : directories) {
if (!dir.isDirectory()) {
- logger.error("ERROR loading project from " + dir.getPath() + ". Not a directory." );
- }
+ logger.error("ERROR loading project from " + dir.getPath() + ". Not a directory.");
+ }
else {
File propertiesFile = new File(dir, PROPERTIES_FILENAME);
if (!propertiesFile.exists()) {
- logger.error("ERROR loading project from " + dir.getPath() + ". Project file " + PROPERTIES_FILENAME + " not found." );
- }
+ logger.error("ERROR loading project from " + dir.getPath()
+ + ". Project file " + PROPERTIES_FILENAME
+ + " not found.");
+ }
else {
Object obj = null;
try {
obj = JSONUtils.parseJSONFromFile(propertiesFile);
- } catch (IOException e) {
- logger.error("ERROR loading project from " + dir.getPath() + ". Project file " + PROPERTIES_FILENAME + " couldn't be read.", e );
+ }
+ catch (IOException e) {
+ logger.error( "ERROR loading project from " + dir.getPath()
+ + ". Project file " + PROPERTIES_FILENAME + " couldn't be read.", e);
continue;
}
Project project = Project.projectFromObject(obj);
logger.info("Loading project " + project.getName());
projects.put(project.getName(), project);
-
+
String source = project.getSource();
if (source == null) {
logger.info(project.getName() + ": No flows uploaded");
continue;
}
-
+
File projectDir = new File(dir, source);
if (!projectDir.exists()) {
logger.error("ERROR project source dir " + projectDir + " doesn't exist.");
- }
+ }
else if (!projectDir.isDirectory()) {
logger.error("ERROR project source dir " + projectDir + " is not a directory.");
- }
+ }
else {
File[] flowFiles = projectDir.listFiles(new SuffixFilter(FLOW_EXTENSION));
+
Map<String, Flow> flowMap = new LinkedHashMap<String, Flow>();
- for (File flowFile: flowFiles) {
+ for (File flowFile : flowFiles) {
Object objectizedFlow = null;
try {
objectizedFlow = JSONUtils.parseJSONFromFile(flowFile);
- } catch (IOException e) {
+ }
+ catch (IOException e) {
logger.error("Error parsing flow file " + flowFile.toString(), e);
continue;
}
- //Recreate Flow
+ // Recreate Flow
Flow flow = null;
-
+
try {
flow = Flow.flowFromObject(objectizedFlow);
- }
+ }
catch (Exception e) {
- logger.error("Error loading flow " + flowFile.getName() + " in project " + project.getName(), e);
+ logger.error(
+ "Error loading flow "
+ + flowFile.getName()
+ + " in project "
+ + project.getName(), e);
continue;
}
logger.debug("Loaded flow " + project.getName() + ": " + flow.getId());
flow.initialize();
-
+
flowMap.put(flow.getId(), flow);
}
@@ -160,14 +175,14 @@ public class FileProjectManager implements ProjectManager {
}
}
}
-
+
public List<String> getProjectNames() {
return new ArrayList<String>(projects.keySet());
}
-
+
public List<Project> getProjects(User user) {
ArrayList<Project> array = new ArrayList<Project>();
- for(Project project : projects.values()) {
+ for (Project project : projects.values()) {
Permission perm = project.getUserPermission(user);
if (perm != null && (perm.isPermissionSet(Type.ADMIN) || perm.isPermissionSet(Type.READ))) {
@@ -183,9 +198,8 @@ public class FileProjectManager implements ProjectManager {
Permission perm = project.getUserPermission(user);
if (perm.isPermissionSet(Type.ADMIN) || perm.isPermissionSet(Type.READ)) {
return project;
- }
- else {
- throw new AccessControlException("Permission denied. Do not have read access.");
+ } else {
+ throw new AccessControlException( "Permission denied. Do not have read access.");
}
}
return project;
@@ -194,7 +208,7 @@ public class FileProjectManager implements ProjectManager {
public void uploadProject(String projectName, File dir, User uploader, boolean force) throws ProjectManagerException {
logger.info("Uploading files to " + projectName);
Project project = projects.get(projectName);
-
+
if (project == null) {
throw new ProjectManagerException("Project not found.");
}
@@ -206,14 +220,15 @@ public class FileProjectManager implements ProjectManager {
DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
loader.loadProjectFlow(dir);
Map<String, Flow> flows = loader.getFlowMap();
-
+
File projectPath = new File(projectDirectory, projectName);
File installDir = new File(projectPath, FILE_DATE_FORMAT.print(System.currentTimeMillis()));
+
if (!installDir.mkdir()) {
throw new ProjectManagerException("Cannot create directory in " + projectDirectory);
}
-
- for (Flow flow: flows.values()) {
+
+ for (Flow flow : flows.values()) {
try {
if (flow.getErrors() != null) {
errors.addAll(flow.getErrors());
@@ -221,18 +236,21 @@ public class FileProjectManager implements ProjectManager {
flow.initialize();
writeFlowFile(installDir, flow);
- } catch (IOException e) {
- throw new ProjectManagerException(
- "Project directory " + projectName + " cannot be created in " + projectDirectory, e);
+ }
+ catch (IOException e) {
+ throw new ProjectManagerException("Project directory "
+ + projectName + " cannot be created in "
+ + projectDirectory, e);
}
}
-
+
File destDirectory = new File(installDir, PROJECT_DIRECTORY);
dir.renameTo(destDirectory);
-
+
// We install only if the project is not forced install or has no errors
if (force || errors.isEmpty()) {
- // We synchronize on project so that we don't collide when uploading.
+ // We synchronize on project so that we don't collide when
+ // uploading.
synchronized (project) {
logger.info("Uploading files to " + projectName);
project.setSource(installDir.getName());
@@ -240,170 +258,185 @@ public class FileProjectManager implements ProjectManager {
project.setLastModifiedUser(uploader.getUserId());
project.setFlows(flows);
}
-
+
try {
writeProjectFile(projectPath, project);
- } catch (IOException e) {
- throw new ProjectManagerException(
- "Project directory " + projectName +
- " cannot be created in " + projectDirectory, e);
+ }
+ catch (IOException e) {
+ throw new ProjectManagerException("Project directory "
+ + projectName + " cannot be created in "
+ + projectDirectory, e);
}
- }
+ }
else {
logger.info("Errors found loading project " + projectName);
StringBuffer bufferErrors = new StringBuffer();
- for(String error : errors) {
+ for (String error : errors) {
bufferErrors.append(error);
bufferErrors.append("\n");
}
-
+
throw new ProjectManagerException(bufferErrors.toString());
}
}
@Override
- public synchronized Project createProject(String projectName, String description, User creator) throws ProjectManagerException {
+ public synchronized Project createProject(String projectName,
+ String description, User creator) throws ProjectManagerException {
if (projectName == null || projectName.trim().isEmpty()) {
throw new ProjectManagerException("Project name cannot be empty.");
- }
+ }
else if (description == null || description.trim().isEmpty()) {
throw new ProjectManagerException("Description cannot be empty.");
- }
+ }
else if (creator == null) {
throw new ProjectManagerException("Valid creator user must be set.");
- }
- else if (!projectName.matches("[a-zA-Z][a-zA-Z_0-9|-]*")){
+ }
+ else if (!projectName.matches("[a-zA-Z][a-zA-Z_0-9|-]*")) {
throw new ProjectManagerException("Project names must start with a letter, followed by any number of letters, digits, '-' or '_'.");
}
-
+
if (projects.contains(projectName)) {
throw new ProjectManagerException("Project already exists.");
}
-
+
File projectPath = new File(projectDirectory, projectName);
if (projectPath.exists()) {
throw new ProjectManagerException("Project already exists.");
}
-
- if(!projectPath.mkdirs()) {
- throw new ProjectManagerException(
- "Project directory " + projectName +
- " cannot be created in " + projectDirectory);
+
+ if (!projectPath.mkdirs()) {
+ throw new ProjectManagerException("Project directory " + projectName + " cannot be created in " + projectDirectory);
}
-
+
Permission perm = new Permission(Type.ADMIN);
long time = System.currentTimeMillis();
-
+
Project project = new Project(projectName);
project.setUserPermission(creator.getUserId(), perm);
project.setDescription(description);
project.setCreateTimestamp(time);
project.setLastModifiedTimestamp(time);
project.setLastModifiedUser(creator.getUserId());
-
+
logger.info("Trying to create " + project.getName() + " by user " + creator.getUserId());
try {
writeProjectFile(projectPath, project);
- } catch (IOException e) {
- throw new ProjectManagerException(
- "Project directory " + projectName +
- " cannot be created in " + projectDirectory, e);
+ }
+ catch (IOException e) {
+ throw new ProjectManagerException("Project directory " + projectName + " cannot be created in " + projectDirectory, e);
}
projects.put(projectName, project);
+
return project;
}
-
+
private synchronized void writeProjectFile(File directory, Project project) throws IOException {
Object object = project.toObject();
- File tmpFile = File.createTempFile("project-",".json", directory);
-
+ File tmpFile = File.createTempFile("project-", ".json", directory);
+
if (tmpFile.exists()) {
tmpFile.delete();
}
-
+
logger.info("Writing project file " + tmpFile);
String output = JSONUtils.toJSON(object, true);
-
+
FileWriter writer = new FileWriter(tmpFile);
try {
writer.write(output);
- } catch (IOException e) {
+ }
+ catch (IOException e) {
if (writer != null) {
writer.close();
}
-
+
throw e;
}
writer.close();
-
+
File projectFile = new File(directory, PROPERTIES_FILENAME);
File swapFile = new File(directory, PROPERTIES_FILENAME + "_old");
-
+
projectFile.renameTo(swapFile);
tmpFile.renameTo(projectFile);
swapFile.delete();
-
+
}
-
+
private void writeFlowFile(File directory, Flow flow) throws IOException {
Object object = flow.toObject();
String filename = flow.getId() + FLOW_EXTENSION;
File outputFile = new File(directory, filename);
File oldOutputFile = new File(directory, filename + ".old");
-
+
if (outputFile.exists()) {
outputFile.renameTo(oldOutputFile);
}
-
+
logger.info("Writing flow file " + outputFile);
String output = JSONUtils.toJSON(object, true);
-
+
FileWriter writer = new FileWriter(outputFile);
try {
writer.write(output);
- } catch (IOException e) {
+ }
+ catch (IOException e) {
if (writer != null) {
writer.close();
}
-
+
throw e;
}
writer.close();
-
+
if (oldOutputFile.exists()) {
oldOutputFile.delete();
}
}
-
- public Props getProperties(String projectName, String source, User user) throws ProjectManagerException {
+
+ @Override
+ public Props getProperties(String projectName, String source, User user)
+ throws ProjectManagerException {
Project project = projects.get(projectName);
if (project == null) {
throw new ProjectManagerException("Project " + project + " cannot be found.");
}
+
+ return getProperties(project, source, user);
+ }
+
+ @Override
+ public Props getProperties(Project project, String source, User user)
+ throws ProjectManagerException {
if (!project.hasPermission(user, Type.READ)) {
- throw new AccessControlException("Permission denied. Do not have read access.");
+ throw new AccessControlException(
+ "Permission denied. Do not have read access.");
}
- String mySource = projectName + File.separatorChar + project.getSource() + File.separatorChar + "src" + File.separatorChar + source;
+ String mySource = project.getName() + File.separatorChar
+ + project.getSource() + File.separatorChar + "src"
+ + File.separatorChar + source;
Element sourceElement = sourceCache.get(mySource);
if (sourceElement != null) {
- return Props.clone((Props)sourceElement.getObjectValue());
+ return Props.clone((Props) sourceElement.getObjectValue());
}
-
+
File file = new File(projectDirectory, mySource);
if (!file.exists()) {
throw new ProjectManagerException("Source file " + file.getAbsolutePath() + " doesn't exist.");
}
try {
- Props props = new Props((Props)null, file);
+ Props props = new Props((Props) null, file);
return props;
- } catch (IOException e) {
+ }
+ catch (IOException e) {
throw new ProjectManagerException("Error loading file " + file.getPath(), e);
}
}
-
+
@Override
public synchronized Project removeProject(String projectName, User user) {
return null;
@@ -411,15 +444,15 @@ public class FileProjectManager implements ProjectManager {
private static class SuffixFilter implements FileFilter {
private String suffix;
-
+
public SuffixFilter(String suffix) {
this.suffix = suffix;
}
-
+
@Override
public boolean accept(File pathname) {
String name = pathname.getName();
-
+
return pathname.isFile() && !pathname.isHidden() && name.length() > suffix.length() && name.endsWith(suffix);
}
}
@@ -430,15 +463,39 @@ public class FileProjectManager implements ProjectManager {
if (project == null) {
throw new ProjectManagerException("Project " + projectName + " doesn't exist.");
}
-
+
File projectPath = new File(projectDirectory, projectName);
try {
writeProjectFile(projectPath, project);
- }
+ }
catch (IOException e) {
throw new ProjectManagerException("Error committing project " + projectName, e);
}
}
+ @Override
+ public HashMap<String, Props> getAllFlowProperties(Project project, String flowId, User user) throws ProjectManagerException {
+ Flow flow = project.getFlow(flowId);
+ if (flow == null) {
+ throw new ProjectManagerException("Flow " + flowId + " doesn't exist in " + project.getName());
+ }
+
+ // Resolve all the node probs
+ HashMap<String, Props> sourceMap = new HashMap<String, Props>();
+ for (Node node : flow.getNodes()) {
+ String source = node.getJobSource();
+ Props props = getProperties(project, node.getJobSource(), user);
+ sourceMap.put(source, props);
+ }
+
+ // Resolve all the shared props.
+ for(FlowProps flowProps: flow.getAllFlowProps().values()) {
+ String source = flowProps.getSource();
+ Props props = getProperties(project, source, user);
+ sourceMap.put(source, props);
+ }
+
+ return sourceMap;
+ }
}
\ No newline at end of file
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index 43f654c..4abf57c 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -1,6 +1,7 @@
package azkaban.project;
import java.io.File;
+import java.util.HashMap;
import java.util.List;
import azkaban.user.User;
@@ -23,4 +24,8 @@ public interface ProjectManager {
public Project removeProject(String projectName, User user) throws ProjectManagerException;
public Props getProperties(String projectName, String source, User user) throws ProjectManagerException;
+
+ public Props getProperties(Project project, String source, User user) throws ProjectManagerException;
+
+ public HashMap<String, Props> getAllFlowProperties(Project project, String flowId, User user) throws ProjectManagerException;
}
\ No newline at end of file
src/java/azkaban/webapp/AzkabanWebServer.java 734(+374 -360)
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index d56eae7..d14ed46 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -35,6 +35,7 @@ import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.thread.QueuedThreadPool;
+import azkaban.executor.ExecutorManager;
import azkaban.project.FileProjectManager;
import azkaban.project.ProjectManager;
import azkaban.user.UserManager;
@@ -42,6 +43,7 @@ import azkaban.user.XmlUserManager;
import azkaban.utils.Props;
import azkaban.utils.Utils;
import azkaban.webapp.servlet.AzkabanServletContextListener;
+import azkaban.webapp.servlet.FlowExecutorServlet;
import azkaban.webapp.servlet.IndexServlet;
import azkaban.webapp.servlet.ProjectManagerServlet;
import azkaban.webapp.session.SessionCache;
@@ -53,373 +55,385 @@ import joptsimple.OptionSpec;
/**
* The Azkaban Jetty server class
*
- * Global azkaban properties for setup. All of them are optional unless otherwise marked:
- * azkaban.name - The displayed name of this instance.
- * azkaban.label - Short descriptor of this Azkaban instance.
- * azkaban.color - Theme color
- * azkaban.temp.dir - Temp dir used by Azkaban for various file uses.
- * web.resource.dir - The directory that contains the static web files.
+ * Global azkaban properties for setup. All of them are optional unless
+ * otherwise marked: azkaban.name - The displayed name of this instance.
+ * azkaban.label - Short descriptor of this Azkaban instance. azkaban.color -
+ * Theme color azkaban.temp.dir - Temp dir used by Azkaban for various file
+ * uses. web.resource.dir - The directory that contains the static web files.
* default.timezone.id - The timezone code. I.E. America/Los Angeles
*
- * user.manager.class - The UserManager class used for the user manager. Default is XmlUserManager.
- * project.manager.class - The ProjectManager to load projects
- * project.global.properties - The base properties inherited by all projects and jobs
+ * user.manager.class - The UserManager class used for the user manager. Default
+ * is XmlUserManager. project.manager.class - The ProjectManager to load
+ * projects project.global.properties - The base properties inherited by all
+ * projects and jobs
*
- * jetty.maxThreads - # of threads for jetty
- * jetty.ssl.port - The ssl port used for sessionizing.
- * jetty.keystore - Jetty keystore .
- * jetty.keypassword - Jetty keystore password
- * jetty.truststore - Jetty truststore
- * jetty.trustpassword - Jetty truststore password
+ * jetty.maxThreads - # of threads for jetty jetty.ssl.port - The ssl port used
+ * for sessionizing. jetty.keystore - Jetty keystore . jetty.keypassword - Jetty
+ * keystore password jetty.truststore - Jetty truststore jetty.trustpassword -
+ * Jetty truststore password
*/
public class AzkabanWebServer {
- private static final Logger logger = Logger.getLogger(AzkabanWebServer.class);
-
- public static final String AZKABAN_HOME = "AZKABAN_HOME";
- public static final String DEFAULT_CONF_PATH = "conf";
- public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
-
- private static AzkabanWebServer app;
-
- private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
- //private static final int DEFAULT_PORT_NUMBER = 8081;
- private static final int DEFAULT_SSL_PORT_NUMBER = 8443;
- private static final int DEFAULT_THREAD_NUMBER = 20;
- private static final String VELOCITY_DEV_MODE_PARAM = "velocity.dev.mode";
- private static final String USER_MANAGER_CLASS_PARAM = "user.manager.class";
- private static final String PROJECT_MANAGER_CLASS_PARAM = "project.manager.class";
- private static final String DEFAULT_STATIC_DIR = "";
-
- private final VelocityEngine velocityEngine;
- private UserManager userManager;
- private ProjectManager projectManager;
-
- private Props props;
- private SessionCache sessionCache;
- private File tempDir;
-
- /**
- * Constructor usually called by tomcat AzkabanServletContext to create the
- * initial server
- */
- public AzkabanWebServer() {
- this(loadConfigurationFromAzkabanHome());
- }
+ private static final Logger logger = Logger.getLogger(AzkabanWebServer.class);
+
+ public static final String AZKABAN_HOME = "AZKABAN_HOME";
+ public static final String DEFAULT_CONF_PATH = "conf";
+ public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
+
+ private static AzkabanWebServer app;
+
+ private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
+ // private static final int DEFAULT_PORT_NUMBER = 8081;
+ private static final int DEFAULT_SSL_PORT_NUMBER = 8443;
+ private static final int DEFAULT_THREAD_NUMBER = 20;
+ private static final String VELOCITY_DEV_MODE_PARAM = "velocity.dev.mode";
+ private static final String USER_MANAGER_CLASS_PARAM = "user.manager.class";
+ private static final String PROJECT_MANAGER_CLASS_PARAM = "project.manager.class";
+ private static final String DEFAULT_STATIC_DIR = "";
+
+ private final VelocityEngine velocityEngine;
+ private UserManager userManager;
+ private ProjectManager projectManager;
+ private ExecutorManager executorManager;
+
+ private Props props;
+ private SessionCache sessionCache;
+ private File tempDir;
+
+ /**
+ * Constructor usually called by tomcat AzkabanServletContext to create the
+ * initial server
+ */
+ public AzkabanWebServer() {
+ this(loadConfigurationFromAzkabanHome());
+ }
+
+ /**
+ * Constructor
+ */
+ public AzkabanWebServer(Props props) {
+ this.props = props;
+ velocityEngine = configureVelocityEngine(props.getBoolean( VELOCITY_DEV_MODE_PARAM, false));
+ sessionCache = new SessionCache(props);
+ userManager = loadUserManager(props);
+ projectManager = loadProjectManager(props);
+ executorManager = loadExecutorManager(props);
+
+ tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
+
+ // Setup time zone
+ if (props.containsKey(DEFAULT_TIMEZONE_ID)) {
+ String timezone = props.getString(DEFAULT_TIMEZONE_ID);
+ TimeZone.setDefault(TimeZone.getTimeZone(timezone));
+ DateTimeZone.setDefault(DateTimeZone.forID(timezone));
- /**
- * Constructor
- */
- public AzkabanWebServer(Props props) {
- this.props = props;
- velocityEngine = configureVelocityEngine(props.getBoolean(
- VELOCITY_DEV_MODE_PARAM, false));
- sessionCache = new SessionCache(props);
- userManager = loadUserManager(props);
- projectManager = loadProjectManager(props);
-
- tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
-
- // Setup time zone
- if (props.containsKey(DEFAULT_TIMEZONE_ID)) {
- String timezone = props.getString(DEFAULT_TIMEZONE_ID);
- TimeZone.setDefault(TimeZone.getTimeZone(timezone));
- DateTimeZone.setDefault(DateTimeZone.forID(timezone));
-
logger.info("Setting timezone to " + timezone);
- }
-
- }
-
- private UserManager loadUserManager(Props props) {
- Class<?> userManagerClass = props.getClass(USER_MANAGER_CLASS_PARAM,
- null);
- logger.info("Loading user manager class " + userManagerClass.getName());
- UserManager manager = null;
-
- if (userManagerClass != null
- && userManagerClass.getConstructors().length > 0) {
-
- try {
- Constructor<?> userManagerConstructor = userManagerClass.getConstructor(Props.class);
- manager = (UserManager)userManagerConstructor.newInstance(props);
- }
- catch (Exception e) {
- logger.error("Could not instantiate UserManager "
- + userManagerClass.getName());
- throw new RuntimeException(e);
- }
-
- } else {
- manager = new XmlUserManager(props);
- }
-
- return manager;
- }
-
- private ProjectManager loadProjectManager(Props props) {
- Class<?> projectManagerClass = props.getClass(PROJECT_MANAGER_CLASS_PARAM, null);
- logger.info("Loading project manager class " + projectManagerClass.getName());
- ProjectManager manager = null;
-
- if (projectManagerClass != null
- && projectManagerClass.getConstructors().length > 0) {
-
- try {
- Constructor<?> projectManagerConstructor = projectManagerClass.getConstructor(Props.class);
- manager = (ProjectManager)projectManagerConstructor.newInstance(props);
- }
- catch (Exception e) {
- logger.error("Could not instantiate ProjectManager "
- + projectManagerClass.getName());
- throw new RuntimeException(e);
- }
-
- } else {
- manager = new FileProjectManager(props);
- }
-
- return manager;
- }
-
- /**
- * Returns the web session cache.
- *
- * @return
- */
- public SessionCache getSessionCache() {
- return sessionCache;
- }
-
- /**
- * Returns the velocity engine for pages to use.
- *
- * @return
- */
- public VelocityEngine getVelocityEngine() {
- return velocityEngine;
- }
-
- /**
- *
- * @return
- */
- public UserManager getUserManager() {
- return userManager;
- }
-
- /**
- *
- * @return
- */
- public ProjectManager getProjectManager() {
- return projectManager;
- }
-
- /**
- * Creates and configures the velocity engine.
- *
- * @param devMode
- * @return
- */
- private VelocityEngine configureVelocityEngine(final boolean devMode) {
- VelocityEngine engine = new VelocityEngine();
- engine.setProperty("resource.loader", "classpath");
- engine.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName());
- engine.setProperty("classpath.resource.loader.cache", !devMode);
- engine.setProperty("classpath.resource.loader.modificationCheckInterval", 5L);
- engine.setProperty("resource.manager.logwhenfound", false);
- engine.setProperty("input.encoding", "UTF-8");
- engine.setProperty("output.encoding", "UTF-8");
- engine.setProperty("directive.set.null.allowed", true);
- engine.setProperty("resource.manager.logwhenfound", false);
- engine.setProperty("velocimacro.permissions.allow.inline", true);
- engine.setProperty("velocimacro.library.autoreload", devMode);
- engine.setProperty("velocimacro.library", "/azkaban/webapp/servlet/velocity/macros.vm");
- engine.setProperty("velocimacro.permissions.allow.inline.to.replace.global", true);
- engine.setProperty("velocimacro.arguments.strict", true);
- engine.setProperty("runtime.log.invalid.references", devMode);
- engine.setProperty("runtime.log.logsystem.class", Log4JLogChute.class);
- engine.setProperty("runtime.log.logsystem.log4j.logger", Logger.getLogger("org.apache.velocity.Logger"));
- engine.setProperty("parser.pool.size", 3);
- return engine;
- }
-
- /**
- * Returns the global azkaban properties
- *
- * @return
- */
- public Props getAzkabanProps() {
- return props;
- }
-
- /**
- * Azkaban using Jetty
- *
- * @param args
- */
- public static void main(String[] args) {
- OptionParser parser = new OptionParser();
-
- OptionSpec<String> configDirectory = parser
- .acceptsAll(Arrays.asList("c", "conf"),"The conf directory for Azkaban.")
- .withRequiredArg()
- .describedAs("conf")
- .ofType(String.class);
-
- logger.error("Starting Jetty Azkaban...");
-
- // Grabbing the azkaban settings from the conf directory.
- Props azkabanSettings = null;
- OptionSet options = parser.parse(args);
- if (options.has(configDirectory)) {
- String path = options.valueOf(configDirectory);
- logger.info("Loading azkaban settings file from " + path);
- File file = new File(path, AZKABAN_PROPERTIES_FILE);
- if (!file.exists() || file.isDirectory() || !file.canRead()) {
- logger.error("Cannot read file " + file);
- }
-
- azkabanSettings = loadAzkabanConfiguration(file.getPath());
- } else {
- logger.info("Conf parameter not set, attempting to get value from AZKABAN_HOME env.");
- azkabanSettings = loadConfigurationFromAzkabanHome();
- }
-
- if (azkabanSettings == null) {
- // one last chance to
- }
-
- if (azkabanSettings == null) {
- logger.error("Azkaban Properties not loaded.");
- logger.error("Exiting Azkaban...");
- return;
- }
- app = new AzkabanWebServer(azkabanSettings);
-
- //int portNumber = azkabanSettings.getInt("jetty.port",DEFAULT_PORT_NUMBER);
- int sslPortNumber = azkabanSettings.getInt("jetty.ssl.port",DEFAULT_SSL_PORT_NUMBER);
- int maxThreads = azkabanSettings.getInt("jetty.maxThreads",DEFAULT_THREAD_NUMBER);
-
- logger.info("Setting up Jetty Server with port:" + sslPortNumber
- + " and numThreads:" + maxThreads);
-
- final Server server = new Server();
- SslSocketConnector secureConnector = new SslSocketConnector();
- secureConnector.setPort(sslPortNumber);
- secureConnector.setKeystore(azkabanSettings.getString("jetty.keystore"));
- secureConnector.setPassword(azkabanSettings.getString("jetty.password"));
- secureConnector.setKeyPassword(azkabanSettings.getString("jetty.keypassword"));
- secureConnector.setTruststore(azkabanSettings.getString("jetty.truststore"));
- secureConnector.setTrustPassword(azkabanSettings.getString("jetty.trustpassword"));
- server.addConnector(secureConnector);
-
- QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
- server.setThreadPool(httpThreadPool);
-
- String staticDir = azkabanSettings.getString("web.resource.dir",DEFAULT_STATIC_DIR);
- logger.info("Setting up web resource dir " + staticDir);
- Context root = new Context(server, "/", Context.SESSIONS);
-
- root.setResourceBase(staticDir);
- ServletHolder index = new ServletHolder(new IndexServlet());
- root.addServlet(index, "/index");
- root.addServlet(index, "/");
-
- ServletHolder staticServlet = new ServletHolder(new DefaultServlet());
- root.addServlet(staticServlet, "/css/*");
- root.addServlet(staticServlet, "/js/*");
- root.addServlet(staticServlet, "/images/*");
- root.addServlet(staticServlet, "/favicon.ico");
-
- root.addServlet(new ServletHolder(new ProjectManagerServlet()), "/manager");
- root.setAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY, app);
-
- try {
- server.start();
- } catch (Exception e) {
- logger.warn(e);
- Utils.croak(e.getMessage(), 1);
- }
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
-
- public void run() {
- logger.info("Shutting down http server...");
- try {
- server.stop();
- server.destroy();
- } catch (Exception e) {
- logger.error("Error while shutting down http server.", e);
- }
- logger.info("kk thx bye.");
- }
- });
- logger.info("Server running on port " + sslPortNumber + ".");
- }
-
- /**
- * Loads the Azkaban property file from the AZKABAN_HOME conf directory
- *
- * @return
- */
- private static Props loadConfigurationFromAzkabanHome() {
- String azkabanHome = System.getenv("AZKABAN_HOME");
-
- if (azkabanHome == null) {
- logger.error("AZKABAN_HOME not set. Will try default.");
- return null;
- }
-
- if (!new File(azkabanHome).isDirectory()
- || !new File(azkabanHome).canRead()) {
- logger.error(azkabanHome + " is not a readable directory.");
- return null;
- }
-
- File confPath = new File(azkabanHome, DEFAULT_CONF_PATH);
- if (!confPath.exists() || !confPath.isDirectory()
- || !confPath.canRead()) {
- logger.error(azkabanHome
- + " does not contain a readable conf directory.");
- return null;
- }
-
- File confFile = new File(confPath, AZKABAN_PROPERTIES_FILE);
- if (!confFile.exists() || confFile.isDirectory() || !confPath.canRead()) {
- logger.error(confFile
- + " does not contain a readable azkaban.properties file.");
- return null;
- }
-
- return loadAzkabanConfiguration(confFile.getPath());
- }
-
- /**
- * Returns the set temp dir
- * @return
- */
- public File getTempDirectory() {
- return tempDir;
- }
-
- /**
- * Loads the Azkaban conf file int a Props object
+ }
+
+ }
+
+ private UserManager loadUserManager(Props props) {
+ Class<?> userManagerClass = props.getClass(USER_MANAGER_CLASS_PARAM, null);
+ logger.info("Loading user manager class " + userManagerClass.getName());
+ UserManager manager = null;
+
+ if (userManagerClass != null && userManagerClass.getConstructors().length > 0) {
+
+ try {
+ Constructor<?> userManagerConstructor = userManagerClass.getConstructor(Props.class);
+ manager = (UserManager) userManagerConstructor.newInstance(props);
+ }
+ catch (Exception e) {
+ logger.error("Could not instantiate UserManager "+ userManagerClass.getName());
+ throw new RuntimeException(e);
+ }
+
+ }
+ else {
+ manager = new XmlUserManager(props);
+ }
+
+ return manager;
+ }
+
+ private ProjectManager loadProjectManager(Props props) {
+ Class<?> projectManagerClass = props.getClass(PROJECT_MANAGER_CLASS_PARAM, null);
+ logger.info("Loading project manager class " + projectManagerClass.getName());
+ ProjectManager manager = null;
+
+ if (projectManagerClass != null && projectManagerClass.getConstructors().length > 0) {
+
+ try {
+ Constructor<?> projectManagerConstructor = projectManagerClass.getConstructor(Props.class);
+ manager = (ProjectManager) projectManagerConstructor.newInstance(props);
+ }
+ catch (Exception e) {
+ logger.error("Could not instantiate ProjectManager " + projectManagerClass.getName());
+ throw new RuntimeException(e);
+ }
+
+ }
+ else {
+ manager = new FileProjectManager(props);
+ }
+
+ return manager;
+ }
+
+ private ExecutorManager loadExecutorManager(Props props) {
+ ExecutorManager execManager = new ExecutorManager(props);
+ return execManager;
+ }
+
+ /**
+ * Returns the web session cache.
+ *
+ * @return
+ */
+ public SessionCache getSessionCache() {
+ return sessionCache;
+ }
+
+ /**
+ * Returns the velocity engine for pages to use.
+ *
+ * @return
+ */
+ public VelocityEngine getVelocityEngine() {
+ return velocityEngine;
+ }
+
+ /**
+ *
+ * @return
+ */
+ public UserManager getUserManager() {
+ return userManager;
+ }
+
+ /**
+ *
+ * @return
+ */
+ public ProjectManager getProjectManager() {
+ return projectManager;
+ }
+
+ /**
*
- * @param path
- * @return
*/
- private static Props loadAzkabanConfiguration(String path) {
- try {
- return new Props(null, path);
- } catch (FileNotFoundException e) {
- logger.error("File not found. Could not load azkaban config file "
- + path);
- } catch (IOException e) {
- logger.error("File found, but error reading. Could not load azkaban config file "
- + path);
- }
-
- return null;
- }
+ public ExecutorManager getExecutorManager() {
+ return executorManager;
+ }
+
+ /**
+ * Creates and configures the velocity engine.
+ *
+ * @param devMode
+ * @return
+ */
+ private VelocityEngine configureVelocityEngine(final boolean devMode) {
+ VelocityEngine engine = new VelocityEngine();
+ engine.setProperty("resource.loader", "classpath");
+ engine.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName());
+ engine.setProperty("classpath.resource.loader.cache", !devMode);
+ engine.setProperty("classpath.resource.loader.modificationCheckInterval", 5L);
+ engine.setProperty("resource.manager.logwhenfound", false);
+ engine.setProperty("input.encoding", "UTF-8");
+ engine.setProperty("output.encoding", "UTF-8");
+ engine.setProperty("directive.set.null.allowed", true);
+ engine.setProperty("resource.manager.logwhenfound", false);
+ engine.setProperty("velocimacro.permissions.allow.inline", true);
+ engine.setProperty("velocimacro.library.autoreload", devMode);
+ engine.setProperty("velocimacro.library", "/azkaban/webapp/servlet/velocity/macros.vm");
+ engine.setProperty("velocimacro.permissions.allow.inline.to.replace.global", true);
+ engine.setProperty("velocimacro.arguments.strict", true);
+ engine.setProperty("runtime.log.invalid.references", devMode);
+ engine.setProperty("runtime.log.logsystem.class", Log4JLogChute.class);
+ engine.setProperty("runtime.log.logsystem.log4j.logger", Logger.getLogger("org.apache.velocity.Logger"));
+ engine.setProperty("parser.pool.size", 3);
+ return engine;
+ }
+
+ /**
+ * Returns the global azkaban properties
+ *
+ * @return
+ */
+ public Props getAzkabanProps() {
+ return props;
+ }
+
+ /**
+ * Azkaban using Jetty
+ *
+ * @param args
+ */
+ public static void main(String[] args) {
+ OptionParser parser = new OptionParser();
+
+ OptionSpec<String> configDirectory = parser
+ .acceptsAll(Arrays.asList("c", "conf"), "The conf directory for Azkaban.")
+ .withRequiredArg()
+ .describedAs("conf").ofType(String.class);
+
+ logger.error("Starting Jetty Azkaban...");
+
+ // Grabbing the azkaban settings from the conf directory.
+ Props azkabanSettings = null;
+ OptionSet options = parser.parse(args);
+ if (options.has(configDirectory)) {
+ String path = options.valueOf(configDirectory);
+ logger.info("Loading azkaban settings file from " + path);
+ File file = new File(path, AZKABAN_PROPERTIES_FILE);
+ if (!file.exists() || file.isDirectory() || !file.canRead()) {
+ logger.error("Cannot read file " + file);
+ }
+
+ azkabanSettings = loadAzkabanConfiguration(file.getPath());
+ }
+ else {
+ logger.info("Conf parameter not set, attempting to get value from AZKABAN_HOME env.");
+ azkabanSettings = loadConfigurationFromAzkabanHome();
+ }
+
+ if (azkabanSettings == null) {
+ // one last chance to
+ }
+
+ if (azkabanSettings == null) {
+ logger.error("Azkaban Properties not loaded.");
+ logger.error("Exiting Azkaban...");
+ return;
+ }
+ app = new AzkabanWebServer(azkabanSettings);
+
+ // int portNumber =
+ // azkabanSettings.getInt("jetty.port",DEFAULT_PORT_NUMBER);
+ int sslPortNumber = azkabanSettings.getInt("jetty.ssl.port",
+ DEFAULT_SSL_PORT_NUMBER);
+ int maxThreads = azkabanSettings.getInt("jetty.maxThreads",
+ DEFAULT_THREAD_NUMBER);
+
+ logger.info("Setting up Jetty Server with port:" + sslPortNumber + " and numThreads:" + maxThreads);
+
+ final Server server = new Server();
+ SslSocketConnector secureConnector = new SslSocketConnector();
+ secureConnector.setPort(sslPortNumber);
+ secureConnector.setKeystore(azkabanSettings.getString("jetty.keystore"));
+ secureConnector.setPassword(azkabanSettings.getString("jetty.password"));
+ secureConnector.setKeyPassword(azkabanSettings.getString("jetty.keypassword"));
+ secureConnector.setTruststore(azkabanSettings.getString("jetty.truststore"));
+ secureConnector.setTrustPassword(azkabanSettings.getString("jetty.trustpassword"));
+ server.addConnector(secureConnector);
+
+ QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
+ server.setThreadPool(httpThreadPool);
+
+ String staticDir = azkabanSettings.getString("web.resource.dir", DEFAULT_STATIC_DIR);
+ logger.info("Setting up web resource dir " + staticDir);
+ Context root = new Context(server, "/", Context.SESSIONS);
+
+ root.setResourceBase(staticDir);
+ ServletHolder index = new ServletHolder(new IndexServlet());
+ root.addServlet(index, "/index");
+ root.addServlet(index, "/");
+
+ ServletHolder staticServlet = new ServletHolder(new DefaultServlet());
+ root.addServlet(staticServlet, "/css/*");
+ root.addServlet(staticServlet, "/js/*");
+ root.addServlet(staticServlet, "/images/*");
+ root.addServlet(staticServlet, "/favicon.ico");
+
+ root.addServlet(new ServletHolder(new ProjectManagerServlet()),"/manager");
+ root.addServlet(new ServletHolder(new FlowExecutorServlet()),"/executor");
+
+ root.setAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY, app);
+
+ try {
+ server.start();
+ }
+ catch (Exception e) {
+ logger.warn(e);
+ Utils.croak(e.getMessage(), 1);
+ }
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+
+ public void run() {
+ logger.info("Shutting down http server...");
+ try {
+ server.stop();
+ server.destroy();
+ }
+ catch (Exception e) {
+ logger.error("Error while shutting down http server.", e);
+ }
+ logger.info("kk thx bye.");
+ }
+ });
+ logger.info("Server running on port " + sslPortNumber + ".");
+ }
+
+ /**
+ * Loads the Azkaban property file from the AZKABAN_HOME conf directory
+ *
+ * @return
+ */
+ private static Props loadConfigurationFromAzkabanHome() {
+ String azkabanHome = System.getenv("AZKABAN_HOME");
+
+ if (azkabanHome == null) {
+ logger.error("AZKABAN_HOME not set. Will try default.");
+ return null;
+ }
+
+ if (!new File(azkabanHome).isDirectory() || !new File(azkabanHome).canRead()) {
+ logger.error(azkabanHome + " is not a readable directory.");
+ return null;
+ }
+
+ File confPath = new File(azkabanHome, DEFAULT_CONF_PATH);
+ if (!confPath.exists() || !confPath.isDirectory()
+ || !confPath.canRead()) {
+ logger.error(azkabanHome + " does not contain a readable conf directory.");
+ return null;
+ }
+
+ File confFile = new File(confPath, AZKABAN_PROPERTIES_FILE);
+ if (!confFile.exists() || confFile.isDirectory() || !confPath.canRead()) {
+ logger.error(confFile + " does not contain a readable azkaban.properties file.");
+ return null;
+ }
+
+ return loadAzkabanConfiguration(confFile.getPath());
+ }
+
+ /**
+ * Returns the set temp dir
+ *
+ * @return
+ */
+ public File getTempDirectory() {
+ return tempDir;
+ }
+
+ /**
+ * Loads the Azkaban conf file int a Props object
+ *
+ * @param path
+ * @return
+ */
+ private static Props loadAzkabanConfiguration(String path) {
+ try {
+ return new Props(null, path);
+ }
+ catch (FileNotFoundException e) {
+ logger.error("File not found. Could not load azkaban config file " + path);
+ }
+ catch (IOException e) {
+ logger.error("File found, but error reading. Could not load azkaban config file " + path);
+ }
+
+ return null;
+ }
}
diff --git a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
index 139b1e8..4d100a5 100644
--- a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
@@ -18,6 +18,7 @@ package azkaban.webapp.servlet;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -43,8 +44,7 @@ import azkaban.webapp.session.Session;
* Base Servlet for pages
*/
public abstract class AbstractAzkabanServlet extends HttpServlet {
- private static final DateTimeFormatter ZONE_FORMATTER = DateTimeFormat
- .forPattern("z");
+ private static final DateTimeFormatter ZONE_FORMATTER = DateTimeFormat.forPattern("z");
private static final String AZKABAN_SUCCESS_MESSAGE = "azkaban.success.message";
private static final String AZKABAN_FAILURE_MESSAGE = "azkaban.failure.message";
@@ -71,10 +71,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
@Override
public void init(ServletConfig config) throws ServletException {
- application = (AzkabanWebServer) config
- .getServletContext()
- .getAttribute(
- AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY);
+ application = (AzkabanWebServer) config.getServletContext().getAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY);
if (application == null) {
throw new IllegalStateException(
@@ -107,14 +104,14 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
* @return
* @throws ServletException
*/
- public String getParam(HttpServletRequest request, String name)
- throws ServletException {
+ public String getParam(HttpServletRequest request, String name) throws ServletException {
String p = request.getParameter(name);
- if (p == null)
- throw new ServletException("Missing required parameter '" + name
- + "'.");
- else
+ if (p == null) {
+ throw new ServletException("Missing required parameter '" + name + "'.");
+ }
+ else {
return p;
+ }
}
/**
@@ -126,12 +123,26 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
* @return
* @throws ServletException
*/
- public int getIntParam(HttpServletRequest request, String name)
- throws ServletException {
+ public int getIntParam(HttpServletRequest request, String name) throws ServletException {
String p = getParam(request, name);
return Integer.parseInt(p);
}
+ public Map<String, String> getParamGroup(HttpServletRequest request, String groupName) throws ServletException {
+ Enumeration<Object> enumerate = (Enumeration<Object>)request.getParameterNames();
+ String matchString = groupName + "[";
+
+ HashMap<String, String> groupParam = new HashMap<String, String>();
+ while( enumerate.hasMoreElements() ) {
+ String str = (String)enumerate.nextElement();
+ if (str.startsWith(matchString)) {
+ groupParam.put(str.substring(matchString.length(), str.length() - 1), request.getParameter(str));
+ }
+
+ }
+ return groupParam;
+ }
+
/**
* Returns the session value of the request.
*
@@ -139,8 +150,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
* @param key
* @param value
*/
- protected void setSessionValue(HttpServletRequest request, String key,
- Object value) {
+ protected void setSessionValue(HttpServletRequest request, String key, Object value) {
request.getSession(true).setAttribute(key, value);
}
@@ -152,8 +162,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
* @param value
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
- protected void addSessionValue(HttpServletRequest request, String key,
- Object value) {
+ protected void addSessionValue(HttpServletRequest request, String key, Object value) {
List l = (List) request.getSession(true).getAttribute(key);
if (l == null)
l = new ArrayList();
@@ -168,8 +177,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
* @param response
* @param errorMsg
*/
- protected void setErrorMessageInCookie(HttpServletResponse response,
- String errorMsg) {
+ protected void setErrorMessageInCookie(HttpServletResponse response, String errorMsg) {
Cookie cookie = new Cookie(AZKABAN_FAILURE_MESSAGE, errorMsg);
response.addCookie(cookie);
}
@@ -181,8 +189,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
* @param response
* @param errorMsg
*/
- protected void setSuccessMessageInCookie(HttpServletResponse response,
- String message) {
+ protected void setSuccessMessageInCookie(HttpServletResponse response, String message) {
Cookie cookie = new Cookie(AZKABAN_SUCCESS_MESSAGE, message);
response.addCookie(cookie);
}
@@ -245,10 +252,8 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
* @param template
* @return
*/
- protected Page newPage(HttpServletRequest req, HttpServletResponse resp,
- Session session, String template) {
- Page page = new Page(req, resp, application.getVelocityEngine(),
- template);
+ protected Page newPage(HttpServletRequest req, HttpServletResponse resp, Session session, String template) {
+ Page page = new Page(req, resp, application.getVelocityEngine(), template);
page.add("azkaban_name", name);
page.add("azkaban_label", label);
page.add("azkaban_color", color);
@@ -260,14 +265,11 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
page.add("context", req.getContextPath());
String errorMsg = getErrorMessageFromCookie(req);
- page.add("error_message",
- errorMsg == null || errorMsg.isEmpty() ? "null" : errorMsg);
+ page.add("error_message", errorMsg == null || errorMsg.isEmpty() ? "null" : errorMsg);
setErrorMessageInCookie(resp, null);
String successMsg = getSuccessMessageFromCookie(req);
- page.add("success_message",
- successMsg == null || successMsg.isEmpty() ? "null"
- : successMsg);
+ page.add("success_message", successMsg == null || successMsg.isEmpty() ? "null" : successMsg);
setSuccessMessageInCookie(resp, null);
return page;
@@ -281,10 +283,8 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
* @param template
* @return
*/
- protected Page newPage(HttpServletRequest req, HttpServletResponse resp,
- String template) {
- Page page = new Page(req, resp, application.getVelocityEngine(),
- template);
+ protected Page newPage(HttpServletRequest req, HttpServletResponse resp, String template) {
+ Page page = new Page(req, resp, application.getVelocityEngine(), template);
page.add("azkaban_name", name);
page.add("azkaban_label", label);
page.add("azkaban_color", color);
@@ -301,8 +301,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
* @param obj
* @throws IOException
*/
- protected void writeJSON(HttpServletResponse resp, Object obj)
- throws IOException {
+ protected void writeJSON(HttpServletResponse resp, Object obj) throws IOException {
resp.setContentType(JSON_MIME_TYPE);
ObjectMapper mapper = new ObjectMapper();
mapper.writeValue(resp.getOutputStream(), obj);
@@ -315,21 +314,17 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
* @return
*/
public static AzkabanWebServer getApp(ServletConfig config) {
- AzkabanWebServer app = (AzkabanWebServer) config
- .getServletContext()
- .getAttribute(
- AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY);
+ AzkabanWebServer app = (AzkabanWebServer) config.getServletContext().getAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY);
if (app == null) {
- throw new IllegalStateException(
- "No batch application is defined in the servlet context!");
- } else {
+ throw new IllegalStateException("No batch application is defined in the servlet context!");
+ }
+ else {
return app;
}
}
- public static String createJsonResponse(String status, String message,
- String action, Map<String, Object> params) {
+ public static String createJsonResponse(String status, String message, String action, Map<String, Object> params) {
HashMap<String, Object> response = new HashMap<String, Object>();
response.put("status", status);
if (message != null) {
diff --git a/src/java/azkaban/webapp/servlet/AzkabanServletContextListener.java b/src/java/azkaban/webapp/servlet/AzkabanServletContextListener.java
index f4a755b..7c5745a 100644
--- a/src/java/azkaban/webapp/servlet/AzkabanServletContextListener.java
+++ b/src/java/azkaban/webapp/servlet/AzkabanServletContextListener.java
@@ -42,7 +42,6 @@ public class AzkabanServletContextListener implements ServletContextListener {
public void contextInitialized(ServletContextEvent event) {
this.app = new AzkabanWebServer();
- event.getServletContext().setAttribute(AZKABAN_SERVLET_CONTEXT_KEY,
- this.app);
+ event.getServletContext().setAttribute(AZKABAN_SERVLET_CONTEXT_KEY, this.app);
}
}
diff --git a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
index c591351..2c643e0 100644
--- a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
@@ -1,24 +1,112 @@
package azkaban.webapp.servlet;
+import java.io.File;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutableFlow.Status;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
+import azkaban.project.ProjectManagerException;
+import azkaban.user.User;
+import azkaban.user.Permission.Type;
+import azkaban.utils.Props;
import azkaban.webapp.session.Session;
public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
+ private static final long serialVersionUID = 1L;
+ private ProjectManager projectManager;
+ private ExecutorManager executorManager;
+ private File tempDir;
+
+ @Override
+ public void init(ServletConfig config) throws ServletException {
+ super.init(config);
+ projectManager = this.getApplication().getProjectManager();
+ executorManager = this.getApplication().getExecutorManager();
+ tempDir = this.getApplication().getTempDirectory();
+ }
@Override
protected void handleGet(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
-
+ if (hasParam(req, "ajax")) {
+ handleAJAXAction(req, resp, session);
+ }
}
@Override
protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+ if (hasParam(req, "ajax")) {
+ handleAJAXAction(req, resp, session);
+ }
+ }
+
+ private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+ String projectName = getParam(req, "project");
+ HashMap<String, Object> ret = new HashMap<String, Object>();
+ ret.put("project", projectName);
+ String ajaxName = getParam(req, "ajax");
+ if (ajaxName.equals("executeFlow")) {
+ ajaxExecuteFlow(req, resp, ret, session.getUser());
+ }
+
+ this.writeJSON(resp, ret);
}
+
+ private void ajaxExecuteFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user) throws ServletException {
+ String projectId = getParam(req, "project");
+ String flowId = getParam(req, "flow");
+
+ ret.put("flow", flowId);
+
+ Project project = projectManager.getProject(projectId, user);
+ if (project == null) {
+ ret.put("error", "Project " + projectId + " does not exist");
+ return;
+ }
+
+ if (!project.hasPermission(user, Type.EXECUTE)) {
+ ret.put("error", "Permission denied. Cannot execute " + flowId);
+ return;
+ }
+ Flow flow = project.getFlow(flowId);
+ if (flow == null) {
+ ret.put("error", "Flow " + flowId + " cannot be found in project " + project);
+ return;
+ }
+
+ HashMap<String, Props> sources;
+ try {
+ sources = projectManager.getAllFlowProperties(project, flowId, user);
+ }
+ catch (ProjectManagerException e) {
+ ret.put("error", e.getMessage());
+ return;
+ }
+
+ ExecutableFlow exflow = ExecutableFlow.createExecutableFlow(flow, sources);
+
+ Map<String, String> paramGroup = this.getParamGroup(req, "disabled");
+ for (Map.Entry<String, String> entry: paramGroup.entrySet()) {
+ boolean nodeDisabled = Boolean.parseBoolean(entry.getValue());
+ exflow.setStatus(entry.getKey(), nodeDisabled ? Status.IGNORED : Status.READY);
+ }
+
+ executorManager.executeFlow(exflow);
+ String execId = exflow.getExecutionId();
+
+ ret.put("execid", "test");
+ }
}
diff --git a/src/java/azkaban/webapp/servlet/IndexServlet.java b/src/java/azkaban/webapp/servlet/IndexServlet.java
index 91ab4ba..69348d7 100644
--- a/src/java/azkaban/webapp/servlet/IndexServlet.java
+++ b/src/java/azkaban/webapp/servlet/IndexServlet.java
@@ -40,21 +40,18 @@ public class IndexServlet extends LoginAbstractAzkabanServlet {
private static final long serialVersionUID = -1;
@Override
- protected void handleGet(HttpServletRequest req, HttpServletResponse resp,
- Session session) throws ServletException, IOException {
+ protected void handleGet(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
User user = session.getUser();
ProjectManager manager = this.getApplication().getProjectManager();
List<Project> projects = manager.getProjects(user);
- Page page = newPage(req, resp, session,
- "azkaban/webapp/servlet/velocity/index.vm");
+ Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/index.vm");
page.add("projects", projects);
page.render();
}
@Override
- protected void handlePost(HttpServletRequest req, HttpServletResponse resp,
- Session session) throws ServletException, IOException {
+ protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
if (hasParam(req, "action")) {
String action = getParam(req, "action");
if (action.equals("create")) {
diff --git a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index c7da369..4492bca 100644
--- a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -38,8 +38,7 @@ public abstract class LoginAbstractAzkabanServlet extends
if (hasParam(req, "logout")) {
resp.sendRedirect(req.getContextPath());
if (session != null) {
- getApplication().getSessionCache().removeSession(
- session.getSessionId());
+ getApplication().getSessionCache().removeSession(session.getSessionId());
}
return;
}
@@ -77,13 +76,11 @@ public abstract class LoginAbstractAzkabanServlet extends
}
}
- private void handleLogin(HttpServletRequest req, HttpServletResponse resp)
- throws ServletException, IOException {
+ private void handleLogin(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
handleLogin(req, resp, null);
}
- private void handleLogin(HttpServletRequest req, HttpServletResponse resp,
- String errorMsg) throws ServletException, IOException {
+ private void handleLogin(HttpServletRequest req, HttpServletResponse resp, String errorMsg) throws ServletException, IOException {
Page page = newPage(req, resp,
"azkaban/webapp/servlet/velocity/login.vm");
@@ -95,38 +92,40 @@ public abstract class LoginAbstractAzkabanServlet extends
}
@Override
- protected void doPost(HttpServletRequest req, HttpServletResponse resp)
- throws ServletException, IOException {
+ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
if (hasParam(req, "action")) {
String action = getParam(req, "action");
if (action.equals("login")) {
handleLoginAction(req, resp);
- } else {
+ }
+ else {
Session session = getSessionFromRequest(req);
if (session == null) {
if (isAjaxCall(req)) {
- String response = createJsonResponse("error",
- "Invalid Session. Need to re-login", "login",
- null);
+ String response = createJsonResponse("error", "Invalid Session. Need to re-login", "login", null);
writeResponse(resp, response);
- } else {
+ }
+ else {
handleLogin(req, resp, "Enter username and password");
}
- } else {
+ }
+ else {
handlePost(req, resp, session);
}
}
- } else {
+ }
+ else {
Session session = getSessionFromRequest(req);
if (session == null) {
if (isAjaxCall(req)) {
- String response = createJsonResponse("error",
- "Invalid Session. Need to re-login", "login", null);
+ String response = createJsonResponse("error", "Invalid Session. Need to re-login", "login", null);
writeResponse(resp, response);
- } else {
+ }
+ else {
handleLogin(req, resp, "Enter username and password");
}
- } else {
+ }
+ else {
handlePost(req, resp, session);
}
}
@@ -142,7 +141,8 @@ public abstract class LoginAbstractAzkabanServlet extends
User user = null;
try {
user = manager.getUser(username, password);
- } catch (UserManagerException e) {
+ }
+ catch (UserManagerException e) {
handleLogin(req, resp, e.getMessage());
return;
}
@@ -152,26 +152,25 @@ public abstract class LoginAbstractAzkabanServlet extends
resp.addCookie(new Cookie(SESSION_ID_NAME, randomUID));
getApplication().getSessionCache().addSession(session);
handleGet(req, resp, session);
- } else {
+ }
+ else {
if (isAjaxCall(req)) {
- String response = createJsonResponse("error",
- "Incorrect Login.", "login", null);
+ String response = createJsonResponse("error", "Incorrect Login.", "login", null);
writeResponse(resp, response);
- } else {
+ }
+ else {
handleLogin(req, resp, "Enter username and password");
}
}
}
- protected void writeResponse(HttpServletResponse resp, String response)
- throws IOException {
+ protected void writeResponse(HttpServletResponse resp, String response) throws IOException {
Writer writer = resp.getWriter();
writer.append(response);
writer.flush();
}
- protected boolean isAjaxCall(HttpServletRequest req)
- throws ServletException {
+ protected boolean isAjaxCall(HttpServletRequest req) throws ServletException {
String value = req.getHeader("X-Requested-With");
if (value != null) {
logger.info("has X-Requested-With " + value);
@@ -191,9 +190,7 @@ public abstract class LoginAbstractAzkabanServlet extends
* @throws ServletException
* @throws IOException
*/
- protected abstract void handleGet(HttpServletRequest req,
- HttpServletResponse resp, Session session) throws ServletException,
- IOException;
+ protected abstract void handleGet(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException;
/**
* The post request is handed off to the implementor after the user is
@@ -205,7 +202,5 @@ public abstract class LoginAbstractAzkabanServlet extends
* @throws ServletException
* @throws IOException
*/
- protected abstract void handlePost(HttpServletRequest req,
- HttpServletResponse resp, Session session) throws ServletException,
- IOException;
+ protected abstract void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException;
}
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 8baf2cd..c2cc965 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -73,8 +73,8 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
@Override
protected void handleGet(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
if ( hasParam(req, "project") ) {
- if (hasParam(req, "json")) {
- handleJSONAction(req, resp, session);
+ if (hasParam(req, "ajax")) {
+ handleAJAXAction(req, resp, session);
}
else if (hasParam(req, "permissions")) {
handlePermissionPage(req, resp, session);
@@ -116,7 +116,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
}
}
- private void handleJSONAction(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+ private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
String projectName = getParam(req, "project");
User user = session.getUser();
@@ -132,47 +132,47 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
return;
}
- String jsonName = getParam(req, "json");
- if (jsonName.equals("fetchflowjobs")) {
- if (handleJsonPermission(project, user, Type.READ, ret)) {
- jsonFetchFlow(project, ret, req, resp);
+ String ajaxName = getParam(req, "ajax");
+ if (ajaxName.equals("fetchflowjobs")) {
+ if (handleAjaxPermission(project, user, Type.READ, ret)) {
+ ajaxFetchFlow(project, ret, req, resp);
}
}
- else if (jsonName.equals("fetchflowgraph")) {
- if (handleJsonPermission(project, user, Type.READ, ret)) {
- jsonFetchFlowGraph(project, ret, req, resp);
+ else if (ajaxName.equals("fetchflowgraph")) {
+ if (handleAjaxPermission(project, user, Type.READ, ret)) {
+ ajaxFetchFlowGraph(project, ret, req, resp);
}
}
- else if (jsonName.equals("fetchprojectflows")) {
- if (handleJsonPermission(project, user, Type.READ, ret)) {
- jsonFetchProjectFlows(project, ret, req, resp);
+ else if (ajaxName.equals("fetchprojectflows")) {
+ if (handleAjaxPermission(project, user, Type.READ, ret)) {
+ ajaxFetchProjectFlows(project, ret, req, resp);
}
}
- else if (jsonName.equals("changeDescription")) {
- if (handleJsonPermission(project, user, Type.WRITE, ret)) {
- jsonChangeDescription(project, ret, req, resp);
+ else if (ajaxName.equals("changeDescription")) {
+ if (handleAjaxPermission(project, user, Type.WRITE, ret)) {
+ ajaxChangeDescription(project, ret, req, resp);
}
}
- else if (jsonName.equals("getPermissions")) {
- if (handleJsonPermission(project, user, Type.READ, ret)) {
- jsonGetPermissions(project, ret);
+ else if (ajaxName.equals("getPermissions")) {
+ if (handleAjaxPermission(project, user, Type.READ, ret)) {
+ ajaxGetPermissions(project, ret);
}
}
- else if (jsonName.equals("changeUserPermission")) {
- if (handleJsonPermission(project, user, Type.ADMIN, ret)) {
- jsonChangePermissions(project, ret, req);
+ else if (ajaxName.equals("changeUserPermission")) {
+ if (handleAjaxPermission(project, user, Type.ADMIN, ret)) {
+ ajaxChangePermissions(project, ret, req);
}
}
- else if (jsonName.equals("addUserPermission")) {
- if (handleJsonPermission(project, user, Type.ADMIN, ret)) {
- jsonAddUserPermission(project, ret, req);
+ else if (ajaxName.equals("addUserPermission")) {
+ if (handleAjaxPermission(project, user, Type.ADMIN, ret)) {
+ ajaxAddUserPermission(project, ret, req);
}
}
this.writeJSON(resp, ret);
}
- private boolean handleJsonPermission(Project project, User user, Type type, Map<String, Object> ret) {
+ private boolean handleAjaxPermission(Project project, User user, Type type, Map<String, Object> ret) {
if (project.hasPermission(user, type)) {
return true;
}
@@ -181,7 +181,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
return false;
}
- private void jsonChangeDescription(Project project, HashMap<String, Object> ret, HttpServletRequest req, HttpServletResponse resp) throws ServletException {
+ private void ajaxChangeDescription(Project project, HashMap<String, Object> ret, HttpServletRequest req, HttpServletResponse resp) throws ServletException {
String description = getParam(req, "description");
project.setDescription(description);
@@ -192,7 +192,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
}
}
- private void jsonFetchProjectFlows(Project project, HashMap<String, Object> ret, HttpServletRequest req, HttpServletResponse resp) throws ServletException {
+ private void ajaxFetchProjectFlows(Project project, HashMap<String, Object> ret, HttpServletRequest req, HttpServletResponse resp) throws ServletException {
ArrayList<Map<String,Object>> flowList = new ArrayList<Map<String,Object>>();
for (Flow flow: project.getFlows()) {
HashMap<String, Object> flowObj = new HashMap<String, Object>();
@@ -203,7 +203,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
ret.put("flows", flowList);
}
- private void jsonFetchFlowGraph(Project project, HashMap<String, Object> ret, HttpServletRequest req, HttpServletResponse resp) throws ServletException {
+ private void ajaxFetchFlowGraph(Project project, HashMap<String, Object> ret, HttpServletRequest req, HttpServletResponse resp) throws ServletException {
String flowId = getParam(req, "flow");
Flow flow = project.getFlow(flowId);
@@ -215,9 +215,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
nodeObj.put("x", node.getPosition().getX());
nodeObj.put("y", node.getPosition().getY());
nodeObj.put("level", node.getLevel());
- if (node.getState() != Node.State.WAITING) {
- nodeObj.put("state", node.getState());
- }
+
nodeList.add(nodeObj);
}
@@ -253,7 +251,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
ret.put("edges", edgeList);
}
- private void jsonFetchFlow(Project project, HashMap<String, Object> ret, HttpServletRequest req, HttpServletResponse resp) throws ServletException {
+ private void ajaxFetchFlow(Project project, HashMap<String, Object> ret, HttpServletRequest req, HttpServletResponse resp) throws ServletException {
String flowId = getParam(req, "flow");
Flow flow = project.getFlow(flowId);
@@ -291,7 +289,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
ret.put("nodes", nodeList);
}
- private void jsonAddUserPermission(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
+ private void ajaxAddUserPermission(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
String username = getParam(req, "username");
UserManager userManager = getApplication().getUserManager();
if (!userManager.validateUser(username)) {
@@ -329,7 +327,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
}
- private void jsonChangePermissions(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
+ private void ajaxChangePermissions(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
boolean admin = Boolean.parseBoolean(getParam(req, "permissions[admin]"));
boolean read = Boolean.parseBoolean(getParam(req, "permissions[read]"));
boolean write = Boolean.parseBoolean(getParam(req, "permissions[write]"));
@@ -359,7 +357,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
}
}
- private void jsonGetPermissions(Project project, HashMap<String, Object> ret) {
+ private void ajaxGetPermissions(Project project, HashMap<String, Object> ret) {
ArrayList<HashMap<String, Object>> permissions = new ArrayList<HashMap<String, Object>>();
for(Pair<String, Permission> perm: project.getUserPermissions()) {
HashMap<String, Object> permObj = new HashMap<String, Object>();
@@ -547,8 +545,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
page.render();
}
- private void handleCreate(HttpServletRequest req, HttpServletResponse resp,
- Session session) throws ServletException {
+ private void handleCreate(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException {
String projectName = hasParam(req, "name") ? getParam(req, "name") : null;
String projectDescription = hasParam(req, "description") ? getParam(req, "description") : null;
@@ -582,9 +579,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
}
}
- private void handleUpload(HttpServletRequest req, HttpServletResponse resp, Map<String, Object> multipart,
- Session session) throws ServletException, IOException {
-
+ private void handleUpload(HttpServletRequest req, HttpServletResponse resp, Map<String, Object> multipart, Session session) throws ServletException, IOException {
User user = session.getUser();
String projectName = (String) multipart.get("project");
FileItem item = (FileItem) multipart.get("file");
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
index 5ba18ad..e880ce6 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -78,6 +78,11 @@
<ul id="jobMenu" class="contextMenu">
<li class="open"><a href="#open">Open...</a></li>
<li class="openwindow"><a href="#openwindow">Open in New Window...</a></li>
+ <li class="disable"><a href="#disable">Disable</a></li>
+ <li class="disableancestors"><a href="#disableancestors">Disable All Ancestors</a></li>
+ <li class="disabledecendents"><a href="#disabledecendents">Disable Decendents</a></li>
+ <li class="enableancestors"><a href="#enableancestors">Enable All Ancestors</a></li>
+ <li class="disabledecendents"><a href="#disabledecendents">Disable Decendents</a></li>
</ul>
</div>
diff --git a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
index a806c9f..df2c192 100644
--- a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
@@ -46,8 +46,8 @@
<h4><a href="${context}/manager?project=${project.name}">Project <span>$project.name</span></a></h4>
</div>
- <a id="execute-btn" class="btn1" href="#">Execute Now</a>
- <a id="execute-btn" class="btn2" href="#">Execute Custom</a>
+ <div id="executebtn" class="btn1">Execute</div>
+ <div id="schedulebtn" class="btn2">Schedule Flow</div>
</div>
<div id="headertabs" class="headertabs">
@@ -81,6 +81,8 @@
<ul id="jobMenu" class="contextMenu">
<li class="open"><a href="#open">Open...</a></li>
<li class="openwindow"><a href="#openwindow">Open in New Window...</a></li>
+ <li class="enable separator"><a href="#enable">Enable</a></li>
+ <li class="disable"><a href="#disable">Disable</a></li>
</ul>
</div>
src/web/css/azkaban.css 23(+22 -1)
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index e7c81f7..309536b 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -1112,6 +1112,19 @@ tr:hover td {
bottom: 90px;
}
+#nodeOptions {
+ position: absolute;
+ bottom: 10px;
+}
+
+#flowProperties {
+ position: absolute:
+ height: 100px;
+ width: 100%;
+ background-color: #000;
+ top: 100px;
+}
+
#filter {
width: 100%;
}
@@ -1136,6 +1149,10 @@ tr:hover td {
color: #EEE;
}
+#list ul li.nodedisabled {
+ opacity: 0.3;
+}
+
#list ul li a {
font-size: 10pt;
margin-left: 5px;
@@ -1195,6 +1212,10 @@ svg .edge:hover {
stroke-width: 4;
}
+svg .node.disabled {
+ opacity: 0.3;
+}
+
svg .node .backboard {
fill: #FFF;
opacity: 0.05;
@@ -1209,7 +1230,7 @@ svg .node:hover .backboard {
}
svg .node circle {
- fill: #FFF;
+ fill: #888;
stroke: #777;
stroke-width: 2;
}
src/web/js/azkaban.flow.view.js 93(+91 -2)
diff --git a/src/web/js/azkaban.flow.view.js b/src/web/js/azkaban.flow.view.js
index 6800733..c2eaf7e 100644
--- a/src/web/js/azkaban.flow.view.js
+++ b/src/web/js/azkaban.flow.view.js
@@ -9,12 +9,54 @@ var handleJobMenuClick = function(action, el, pos) {
else if(action == "openwindow") {
window.open(requestURL);
}
+ else if(action == "disable") {
+ var oldDisabled = graphModel.get("disabled");
+
+ var newDisabled = {};
+ // Copy disabled list
+ if (oldDisabled) {
+ for(var id in oldDisabled) {
+ if(oldDisabled.hasOwnProperty(id)) {
+ var disabled = (oldDisabled[id]);
+ if (disabled) {
+ newDisabled[id]=true;
+ }
+ }
+ }
+ }
+
+ newDisabled[jobid] = true;
+ graphModel.set({disabled: newDisabled});
+ }
+ else if(action == "enable") {
+ var oldDisabled = graphModel.get("disabled");
+ // Copy disabled list
+ if (oldDisabled) {
+ var newDisabled = {};
+ for(var id in oldDisabled) {
+ if(oldDisabled.hasOwnProperty(id)) {
+ var disabled = (oldDisabled[id]);
+ if (disabled) {
+ newDisabled[id]=true;
+ }
+ }
+ }
+
+ if( oldDisabled[jobid]) {
+ newDisabled[jobid] = false;
+ graphModel.set({disabled: newDisabled});
+ }
+ }
+ }
}
function hasClass(el, name)
{
var classes = el.getAttribute("class");
+ if (classes == null) {
+ return false;
+ }
return new RegExp('(\\s|^)'+name+'(\\s|$)').test(classes);
}
@@ -79,6 +121,7 @@ azkaban.JobListView = Backbone.View.extend({
},
initialize: function(settings) {
this.model.bind('change:selected', this.handleSelectionChange, this);
+ this.model.bind('change:disabled', this.handleDisabledChange, this);
this.model.bind('change:graph', this.render, this);
},
filterJobs: function(self) {
@@ -199,6 +242,20 @@ azkaban.JobListView = Backbone.View.extend({
this.model.set({"selected": jobid});
}
},
+ handleDisabledChange: function(evt) {
+ var disabledMap = this.model.get("disabled");
+ for(var id in disabledMap) {
+ if(disabledMap.hasOwnProperty(id)) {
+ var disabled = (disabledMap[id]);
+ if (disabled) {
+ $(this.listNodes[id]).addClass("nodedisabled");
+ }
+ else {
+ $(this.listNodes[id]).removeClass("nodedisabled");
+ }
+ }
+ }
+ },
handleSelectionChange: function(evt) {
if (!this.model.hasChanged("selected")) {
return;
@@ -227,6 +284,7 @@ azkaban.SvgGraphView = Backbone.View.extend({
},
initialize: function(settings) {
this.model.bind('change:selected', this.changeSelected, this);
+ this.model.bind('change:disabled', this.handleDisabledChange, this);
this.model.bind('change:graph', this.render, this);
this.model.bind('resetPanZoom', this.resetPanZoom, this);
@@ -367,6 +425,24 @@ azkaban.SvgGraphView = Backbone.View.extend({
self.mainG.appendChild(line);
}
},
+ handleDisabledChange: function(evt) {
+ var disabledMap = this.model.get("disabled");
+ for(var id in disabledMap) {
+ if(disabledMap.hasOwnProperty(id)) {
+ var disabled = disabledMap[id];
+ this.nodes[id].disabled = disabled;
+ var g = document.getElementById(id);
+
+ if (disabled) {
+ this.nodes[id].disabled = disabled;
+ addClass(g, "disabled");
+ }
+ else {
+ removeClass(g, "disabled");
+ }
+ }
+ }
+ },
drawNode: function(self, node, bounds) {
var svg = self.svgGraph;
var svgns = self.svgns;
@@ -439,7 +515,6 @@ azkaban.SvgGraphView = Backbone.View.extend({
var bounds = this.graphBounds;
$("#svgGraph").svgNavigate("transformToBox", {x: bounds.minX, y: bounds.minY, width: (bounds.maxX - bounds.minX), height: (bounds.maxY - bounds.minY) });
}
-
});
var graphModel;
@@ -447,6 +522,7 @@ azkaban.GraphModel = Backbone.Model.extend({});
$(function() {
var selected;
+
if (window.location.hash) {
var hash = window.location.hash;
if (hash == "#jobslist") {
@@ -470,7 +546,7 @@ $(function() {
$.get(
requestURL,
- {"project": projectName, "json":"fetchflowgraph", "flow":flowName},
+ {"project": projectName, "ajax":"fetchflowgraph", "flow":flowName},
function(data) {
console.log("data fetched");
graphModel.set({data: data});
@@ -478,4 +554,17 @@ $(function() {
},
"json"
);
+
+ $("#executebtn").click( function() {
+ var executeURL = contextURL + "/executor";
+ $.get(
+ executeURL,
+ {"project": projectName, "ajax":"executeFlow", "flow":flowName, "disabled":graphModel.get("disabled")},
+ function(data) {
+ alert("call success");
+ },
+ "json"
+ );
+
+ });
});
src/web/js/azkaban.main.view.js 2(+1 -1)
diff --git a/src/web/js/azkaban.main.view.js b/src/web/js/azkaban.main.view.js
index 49b9d25..10eff92 100644
--- a/src/web/js/azkaban.main.view.js
+++ b/src/web/js/azkaban.main.view.js
@@ -42,7 +42,7 @@ azkaban.ProjectTableView= Backbone.View.extend({
$.get(
requestURL,
- {"project": targetId, "json":"fetchprojectflows"},
+ {"project": targetId, "ajax":"fetchprojectflows"},
function(data) {
console.log("Success");
target.loaded = true;
diff --git a/src/web/js/azkaban.permission.view.js b/src/web/js/azkaban.permission.view.js
index d86bf1c..d6c0bbb 100644
--- a/src/web/js/azkaban.permission.view.js
+++ b/src/web/js/azkaban.permission.view.js
@@ -138,7 +138,7 @@ azkaban.ChangePermissionView= Backbone.View.extend({
$.get(
requestURL,
- {"project": projectId, "username": userID, "json":command, "permissions": this.permission},
+ {"project": projectId, "username": userID, "ajax":command, "permissions": this.permission},
function(data) {
console.log("Output");
if (data.error) {
src/web/js/azkaban.project.view.js 4(+2 -2)
diff --git a/src/web/js/azkaban.project.view.js b/src/web/js/azkaban.project.view.js
index b929c96..0365e80 100644
--- a/src/web/js/azkaban.project.view.js
+++ b/src/web/js/azkaban.project.view.js
@@ -86,7 +86,7 @@ azkaban.FlowTableView= Backbone.View.extend({
$.get(
requestURL,
- {"project": projectId, "json":"fetchflowjobs", "flow":targetId},
+ {"project": projectId, "ajax":"fetchflowjobs", "flow":targetId},
function(data) {
console.log("Success");
target.loaded = true;
@@ -172,7 +172,7 @@ azkaban.ProjectSummaryView= Backbone.View.extend({
$.get(
requestURL,
- {"project": projectId, "json":"changeDescription", "description":newText},
+ {"project": projectId, "ajax":"changeDescription", "description":newText},
function(data) {
if (data.error) {
alert(data.error);