azkaban-aplcache
Changes
src/java/azkaban/executor/ExecutorManager.java 20(+10 -10)
src/java/azkaban/project/FileProjectManager.java 34(+11 -23)
Details
src/java/azkaban/executor/ExecutorManager.java 20(+10 -10)
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 60701b7..bcbb360 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -31,10 +31,10 @@ import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.flow.Flow;
+import azkaban.project.Project;
import azkaban.utils.ExecutableFlowLoader;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
@@ -221,15 +221,6 @@ public class ExecutorManager {
return searchFlows;
}
- private boolean isBetween(long val, long from, long to) {
- // Means that range isn't set, so we'll just say that it's okay.
- if (to < -1) {
- return true;
- }
-
- return val >= from && val <= to;
- }
-
private void loadActiveExecutions() throws IOException, ExecutorManagerException {
File activeFlows = new File(basePath, ACTIVE_DIR);
File[] activeFlowDirs = activeFlows.listFiles();
@@ -655,6 +646,10 @@ public class ExecutorManager {
cleanupUnusedFiles(exFlow);
}
+ /**
+ * Thread that polls the executor for executing jobs.
+ * It is also cleans up the flow execution files after it's done.
+ */
private class ExecutingManagerUpdaterThread extends Thread {
private boolean shutdown = false;
private int updateTimeMs = UPDATE_THREAD_MS;
@@ -762,6 +757,11 @@ public class ExecutorManager {
}
}
+ /**
+ * Reference to a Flow Execution.
+ * It allows us to search for Flow and Project with only the Execution Id, it references
+ * a file in the execution directories.
+ */
public static class ExecutionReference implements Comparable<ExecutionReference> {
private String execId;
private String projectId;
src/java/azkaban/project/FileProjectManager.java 34(+11 -23)
diff --git a/src/java/azkaban/project/FileProjectManager.java b/src/java/azkaban/project/FileProjectManager.java
index 87bccfc..9611095 100644
--- a/src/java/azkaban/project/FileProjectManager.java
+++ b/src/java/azkaban/project/FileProjectManager.java
@@ -182,7 +182,7 @@ public class FileProjectManager implements ProjectManager {
return new ArrayList<String>(projects.keySet());
}
- public List<Project> getProjects(User user) {
+ public List<Project> getUserProjects(User user) {
ArrayList<Project> array = new ArrayList<Project>();
for (Project project : projects.values()) {
Permission perm = project.getUserPermission(user);
@@ -194,17 +194,9 @@ public class FileProjectManager implements ProjectManager {
return array;
}
- public Project getProject(String name, User user) {
- Project project = projects.get(name);
- if (project != null) {
- Permission perm = project.getUserPermission(user);
- if (perm.isPermissionSet(Type.ADMIN) || perm.isPermissionSet(Type.READ)) {
- return project;
- } else {
- throw new AccessControlException( "Permission denied. Do not have read access.");
- }
- }
- return project;
+ @Override
+ public Project getProject(String name) {
+ return projects.get(name);
}
public void uploadProject(String projectName, File dir, User uploader, boolean force) throws ProjectManagerException {
@@ -399,23 +391,19 @@ public class FileProjectManager implements ProjectManager {
}
@Override
- public Props getProperties(String projectName, String source, User user)
+ public Props getProperties(String projectName, String source)
throws ProjectManagerException {
Project project = projects.get(projectName);
if (project == null) {
throw new ProjectManagerException("Project " + project + " cannot be found.");
}
- return getProperties(project, source, user);
+ return getProperties(project, source);
}
@Override
- public Props getProperties(Project project, String source, User user)
+ public Props getProperties(Project project, String source)
throws ProjectManagerException {
- if (!project.hasPermission(user, Type.READ)) {
- throw new AccessControlException(
- "Permission denied. Do not have read access.");
- }
String mySource = project.getName() + File.separatorChar
+ project.getSource() + File.separatorChar + "src"
@@ -441,7 +429,7 @@ public class FileProjectManager implements ProjectManager {
}
@Override
- public synchronized Project removeProject(String projectName, User user) {
+ public synchronized Project removeProject(String projectName) {
return null;
}
@@ -477,7 +465,7 @@ public class FileProjectManager implements ProjectManager {
}
@Override
- public HashMap<String, Props> getAllFlowProperties(Project project, String flowId, User user) throws ProjectManagerException {
+ public HashMap<String, Props> getAllFlowProperties(Project project, String flowId) throws ProjectManagerException {
Flow flow = project.getFlow(flowId);
if (flow == null) {
throw new ProjectManagerException("Flow " + flowId + " doesn't exist in " + project.getName());
@@ -487,14 +475,14 @@ public class FileProjectManager implements ProjectManager {
HashMap<String, Props> sourceMap = new HashMap<String, Props>();
for (Node node : flow.getNodes()) {
String source = node.getJobSource();
- Props props = getProperties(project, node.getJobSource(), user);
+ Props props = getProperties(project, node.getJobSource());
sourceMap.put(source, props);
}
// Resolve all the shared props.
for(FlowProps flowProps: flow.getAllFlowProps().values()) {
String source = flowProps.getSource();
- Props props = getProperties(project, source, user);
+ Props props = getProperties(project, source);
sourceMap.put(source, props);
}
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index 8813316..e7dc5f5 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -11,23 +11,23 @@ public interface ProjectManager {
public List<String> getProjectNames();
- public List<Project> getProjects(User user);
+ public List<Project> getUserProjects(User user);
public void commitProject(String name) throws ProjectManagerException;
- public Project getProject(String name, User user);
+ public Project getProject(String name);
public void uploadProject(String projectName, File projectDir, User uploader, boolean force) throws ProjectManagerException;
public Project createProject(String projectName, String description, User creator) throws ProjectManagerException;
- public Project removeProject(String projectName, User user) throws ProjectManagerException;
+ public Project removeProject(String projectName) throws ProjectManagerException;
- public Props getProperties(String projectName, String source, User user) throws ProjectManagerException;
+ public Props getProperties(String projectName, String source) throws ProjectManagerException;
- public Props getProperties(Project project, String source, User user) throws ProjectManagerException;
+ public Props getProperties(Project project, String source) throws ProjectManagerException;
- public HashMap<String, Props> getAllFlowProperties(Project project, String flowId, User user) throws ProjectManagerException;
+ public HashMap<String, Props> getAllFlowProperties(Project project, String flowId) throws ProjectManagerException;
public void copyProjectSourceFilesToDirectory(Project project, File directory) throws ProjectManagerException;
}
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
index 515e14b..ac000b5 100644
--- a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
@@ -144,6 +144,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
}
public Map<String, String> getParamGroup(HttpServletRequest request, String groupName) throws ServletException {
+ @SuppressWarnings("unchecked")
Enumeration<Object> enumerate = (Enumeration<Object>)request.getParameterNames();
String matchString = groupName + "[";
diff --git a/src/java/azkaban/webapp/servlet/ExecutionServlet.java b/src/java/azkaban/webapp/servlet/ExecutionServlet.java
index 4c3a7cc..8ec0023 100644
--- a/src/java/azkaban/webapp/servlet/ExecutionServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutionServlet.java
@@ -1,7 +1,6 @@
package azkaban.webapp.servlet;
import java.io.IOException;
-import java.security.AccessControlException;
import java.util.List;
import javax.servlet.ServletConfig;
@@ -14,7 +13,9 @@ import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerException;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
+import azkaban.user.Permission;
import azkaban.user.User;
+import azkaban.user.Permission.Type;
import azkaban.webapp.session.Session;
public class ExecutionServlet extends LoginAbstractAzkabanServlet {
@@ -46,9 +47,7 @@ public class ExecutionServlet extends LoginAbstractAzkabanServlet {
private void handleExecutionsPage(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/executionspage.vm");
- User user = session.getUser();
-
- //executorManager.
+
List<ExecutableFlow> runningFlows = executorManager.getRunningFlows();
page.add("runningFlows", runningFlows.isEmpty() ? null : runningFlows);
@@ -78,16 +77,10 @@ public class ExecutionServlet extends LoginAbstractAzkabanServlet {
}
String projectId = flow.getProjectId();
- Project project = null;
- try {
- project = projectManager.getProject(flow.getProjectId(), user);
- } catch (AccessControlException e) {
- page.add("errorMsg", "Do not have permission to view '" + flow.getExecutionId() + "'.");
+ Project project = getProjectPageByPermission(page, flow.getProjectId(), user, Type.READ);
+ if(project == null) {
page.render();
- }
-
- if (project == null) {
- page.add("errorMsg", "Project " + projectId + " not found.");
+ return;
}
page.add("projectName", projectId);
@@ -95,4 +88,20 @@ public class ExecutionServlet extends LoginAbstractAzkabanServlet {
page.render();
}
+
+ protected Project getProjectPageByPermission(Page page, String projectId, User user, Permission.Type type) {
+ Project project = projectManager.getProject(projectId);
+
+ if (project == null) {
+ page.add("errorMsg", "Project " + project + " not found.");
+ }
+ else if (!project.hasPermission(user, type)) {
+ page.add("errorMsg", "User " + user.getUserId() + " doesn't have " + type.name() + " permissions on " + projectId);
+ }
+ else {
+ return project;
+ }
+
+ return null;
+ }
}
diff --git a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
index 957b6e0..9cbd74f 100644
--- a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
@@ -2,7 +2,6 @@ package azkaban.webapp.servlet;
import java.io.File;
import java.io.IOException;
-import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@@ -18,27 +17,24 @@ import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
-import azkaban.flow.Node;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
import azkaban.project.ProjectManagerException;
+import azkaban.user.Permission;
import azkaban.user.User;
import azkaban.user.Permission.Type;
-import azkaban.utils.Props;
import azkaban.webapp.session.Session;
public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
private static final long serialVersionUID = 1L;
private ProjectManager projectManager;
private ExecutorManager executorManager;
- private File tempDir;
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
projectManager = this.getApplication().getProjectManager();
executorManager = this.getApplication().getExecutorManager();
- tempDir = this.getApplication().getTempDirectory();
}
@Override
@@ -72,16 +68,10 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
}
String projectId = flow.getProjectId();
- Project project = null;
- try {
- project = projectManager.getProject(flow.getProjectId(), user);
- } catch (AccessControlException e) {
- page.add("errorMsg", "Do not have permission to view '" + flow.getExecutionId() + "'.");
- page.render();
- }
-
+ Project project = getProjectPageByPermission(page, flow.getProjectId(), user, Type.READ);
if (project == null) {
- page.add("errorMsg", "Project " + projectId + " not found.");
+ page.render();
+ return;
}
page.add("projectName", projectId);
@@ -90,6 +80,38 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
page.render();
}
+ protected Project getProjectPageByPermission(Page page, String projectId, User user, Permission.Type type) {
+ Project project = projectManager.getProject(projectId);
+
+ if (project == null) {
+ page.add("errorMsg", "Project " + project + " not found.");
+ }
+ else if (!project.hasPermission(user, type)) {
+ page.add("errorMsg", "User " + user.getUserId() + " doesn't have " + type.name() + " permissions on " + projectId);
+ }
+ else {
+ return project;
+ }
+
+ return null;
+ }
+
+ protected Project getProjectAjaxByPermission(Map<String, Object> ret, String projectId, User user, Permission.Type type) {
+ Project project = projectManager.getProject(projectId);
+
+ if (project == null) {
+ ret.put("error", "Project " + project + " not found.");
+ }
+ else if (!project.hasPermission(user, type)) {
+ ret.put("error", "User " + user.getUserId() + " doesn't have " + type.name() + " permissions on " + projectId);
+ }
+ else {
+ return project;
+ }
+
+ return null;
+ }
+
@Override
protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
if (hasParam(req, "ajax")) {
@@ -119,7 +141,7 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
}
this.writeJSON(resp, ret);
}
-
+
private void ajaxFetchExecutableFlowUpdate(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user) throws ServletException{
String execid = getParam(req, "execid");
Long lastUpdateTime = Long.parseLong(getParam(req, "lastUpdateTime"));
@@ -137,16 +159,11 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
return;
}
- Project project = null;
- try {
- project = projectManager.getProject(exFlow.getProjectId(), user);
- }
- catch (AccessControlException e) {
- ret.put("error", "Permission denied. User " + user.getUserId() + " doesn't have permissions to view project " + project.getName());
+ Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
+ if (project == null) {
return;
}
-
-
+
// Just update the nodes and flow states
ArrayList<Map<String, Object>> nodeList = new ArrayList<Map<String, Object>>();
for (ExecutableNode node : exFlow.getExecutableNodes()) {
@@ -184,15 +201,11 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
return;
}
- Project project = null;
- try {
- project = projectManager.getProject(exFlow.getProjectId(), user);
- }
- catch (AccessControlException e) {
- ret.put("error", "Permission denied. User " + user.getUserId() + " doesn't have permissions to view project " + project.getName());
+ Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
+ if (project == null) {
return;
}
-
+
ArrayList<Map<String, Object>> nodeList = new ArrayList<Map<String, Object>>();
ArrayList<Map<String, Object>> edgeList = new ArrayList<Map<String,Object>>();
for (ExecutableNode node : exFlow.getExecutableNodes()) {
@@ -229,14 +242,8 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
ret.put("flow", flowId);
- Project project = projectManager.getProject(projectId, user);
+ Project project = getProjectAjaxByPermission(ret, projectId, user, Type.EXECUTE);
if (project == null) {
- ret.put("error", "Project " + projectId + " does not exist");
- return;
- }
-
- if (!project.hasPermission(user, Type.EXECUTE)) {
- ret.put("error", "Permission denied. Cannot execute " + flowId);
return;
}
@@ -246,15 +253,6 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
return;
}
- HashMap<String, Props> sources;
- try {
- sources = projectManager.getAllFlowProperties(project, flowId, user);
- }
- catch (ProjectManagerException e) {
- ret.put("error", e.getMessage());
- return;
- }
-
// Create ExecutableFlow
ExecutableFlow exflow = executorManager.createExecutableFlow(flow);
exflow.setSubmitUser(user.getUserId());
@@ -306,14 +304,7 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
}
String execId = exflow.getExecutionId();
-
- // The following is just a test for cleanup
-// try {
-// executorManager.cleanupUnusedFiles(exflow);
-// } catch (ExecutorManagerException e) {
-// e.printStackTrace();
-// }
-//
+
ret.put("execid", execId);
}
}
diff --git a/src/java/azkaban/webapp/servlet/HistoryServlet.java b/src/java/azkaban/webapp/servlet/HistoryServlet.java
index d665e5b..393c8bd 100644
--- a/src/java/azkaban/webapp/servlet/HistoryServlet.java
+++ b/src/java/azkaban/webapp/servlet/HistoryServlet.java
@@ -1,7 +1,6 @@
package azkaban.webapp.servlet;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import javax.servlet.ServletConfig;
@@ -11,19 +10,16 @@ import javax.servlet.http.HttpServletResponse;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManager.ExecutionReference;
-import azkaban.project.ProjectManager;
import azkaban.webapp.session.Session;
public class HistoryServlet extends LoginAbstractAzkabanServlet {
private static final long serialVersionUID = 1L;
- private ProjectManager projectManager;
private ExecutorManager executorManager;
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
- projectManager = this.getApplication().getProjectManager();
executorManager = this.getApplication().getExecutorManager();
}
diff --git a/src/java/azkaban/webapp/servlet/IndexServlet.java b/src/java/azkaban/webapp/servlet/IndexServlet.java
index 69348d7..7243b9e 100644
--- a/src/java/azkaban/webapp/servlet/IndexServlet.java
+++ b/src/java/azkaban/webapp/servlet/IndexServlet.java
@@ -44,7 +44,7 @@ public class IndexServlet extends LoginAbstractAzkabanServlet {
User user = session.getUser();
ProjectManager manager = this.getApplication().getProjectManager();
- List<Project> projects = manager.getProjects(user);
+ List<Project> projects = manager.getUserProjects(user);
Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/index.vm");
page.add("projects", projects);
page.render();
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 480af43..648333b 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -127,15 +127,12 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
HashMap<String, Object> ret = new HashMap<String, Object>();
ret.put("project", projectName);
- Project project = null;
- try {
- project = projectManager.getProject(projectName, user);
- } catch (Exception e) {
- ret.put("error", e.getMessage());
- this.writeJSON(resp, ret);
- return;
+ Project project = projectManager.getProject(projectName);
+ if (project == null) {
+ ret.put("error", "Project " + projectName + " doesn't exist.");
+ return;
}
-
+
String ajaxName = getParam(req, "ajax");
if (ajaxName.equals("fetchflowjobs")) {
if (handleAjaxPermission(project, user, Type.READ, ret)) {
@@ -177,7 +174,9 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
ajaxFetchFlowExecutions(project, ret, req);
}
}
-
+ else {
+ ret.put("error", "Cannot execute command " + ajaxName);
+ }
this.writeJSON(resp, ret);
}
@@ -416,7 +415,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
Project project = null;
try {
- project = projectManager.getProject(projectName, user);
+ project = projectManager.getProject(projectName);
if (project == null) {
page.add("errorMsg", "Project " + projectName + " not found.");
}
@@ -449,11 +448,16 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
Project project = null;
Flow flow = null;
try {
- project = projectManager.getProject(projectName, user);
+ project = projectManager.getProject(projectName);
+
if (project == null) {
page.add("errorMsg", "Project " + projectName + " not found.");
}
else {
+ if (!project.hasPermission(user, Type.READ)) {
+ throw new AccessControlException( "No permission to view project " + projectName + ".");
+ }
+
page.add("project", project);
flow = project.getFlow(flowName);
@@ -469,7 +473,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
page.add("errorMsg", "Job " + jobName + " not found.");
}
else {
- Props prop = projectManager.getProperties(projectName, node.getJobSource(), user);
+ Props prop = projectManager.getProperties(projectName, node.getJobSource());
page.add("jobid", node.getId());
page.add("jobtype", node.getType());
@@ -529,11 +533,16 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
Project project = null;
Flow flow = null;
try {
- project = projectManager.getProject(projectName, user);
+ project = projectManager.getProject(projectName);
+
if (project == null) {
page.add("errorMsg", "Project " + projectName + " not found.");
}
else {
+ if (!project.hasPermission(user, Type.READ)) {
+ throw new AccessControlException( "No permission Project " + projectName + ".");
+ }
+
page.add("project", project);
flow = project.getFlow(flowName);
@@ -558,7 +567,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
User user = session.getUser();
Project project = null;
try {
- project = projectManager.getProject(projectName, user);
+ project = projectManager.getProject(projectName);
if (project == null) {
page.add("errorMsg", "Project " + projectName + " not found.");
}
@@ -566,6 +575,11 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
if (project.hasPermission(user, Type.ADMIN)) {
page.add("admin", true);
}
+
+ if (!project.hasPermission(user, Type.READ)) {
+ throw new AccessControlException( "No permission to view project " + projectName + ".");
+ }
+
page.add("project", project);
page.add("admins", Utils.flattenToString(project.getUsersWithPermission(Type.ADMIN), ","));
page.add("userpermission", project.getUserPermission(user));
@@ -584,7 +598,6 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
}
private void handleCreate(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException {
-
String projectName = hasParam(req, "name") ? getParam(req, "name") : null;
String projectDescription = hasParam(req, "description") ? getParam(req, "description") : null;
logger.info("Create project " + projectName);