azkaban-uncached
Changes
src/java/azkaban/execapp/JobRunner.java 10(+9 -1)
Details
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 31bae29..eebb427 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -84,6 +84,8 @@ public class FlowRunner extends EventHandler implements Runnable {
private List<String> proxyUsers = null;
+ private boolean proxyUserLockDown = false;
+
public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
this.execId = flow.getExecutionId();
this.flow = flow;
@@ -96,6 +98,10 @@ public class FlowRunner extends EventHandler implements Runnable {
this.proxyUsers = getProxyUsers();
}
+ public void setProxyUserLockDown(boolean doLockDown) {
+ this.proxyUserLockDown = doLockDown;
+ }
+
private List<String> getProxyUsers() {
List<String> allUsers = new ArrayList<String>();
allUsers.add(flow.getSubmitUser());
@@ -103,7 +109,7 @@ public class FlowRunner extends EventHandler implements Runnable {
try {
permissions = projectLoader.getProjectPermissions(flow.getProjectId());
for(Triple<String, Boolean, Permission> triple : permissions) {
- if(triple.getSecond() == false && triple.getThird().isPermissionSet(Permission.Type.EXECUTE)) {
+ if(triple.getSecond() == false && (triple.getThird().isPermissionSet(Permission.Type.EXECUTE) || triple.getThird().isPermissionSet(Permission.Type.ADMIN) )) {
allUsers.add(triple.getFirst());
}
}
@@ -371,6 +377,7 @@ public class FlowRunner extends EventHandler implements Runnable {
// should have one prop with system secrets, the other user level props
JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), proxyUsers, executorLoader, jobtypeManager, logger);
+ jobRunner.setUserLockDown(proxyUserLockDown);
jobRunner.addListener(listener);
return jobRunner;
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index b0da031..3b20d7d 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -82,6 +82,8 @@ public class FlowRunnerManager implements EventListener {
private Object executionDirDeletionSync = new Object();
+ private final boolean proxyUserLockDown;
+
public FlowRunnerManager(Props props, ExecutorLoader executorLoader, ProjectLoader projectLoader, ClassLoader parentClassLoader) throws IOException {
executionDirectory = new File(props.getString("azkaban.execution.dir", "executions"));
projectDirectory = new File(props.getString("azkaban.project.dir", "projects"));
@@ -112,6 +114,8 @@ public class FlowRunnerManager implements EventListener {
jobtypeManager = new JobTypeManager(props.getString(AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR, JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR), parentClassLoader);
+ proxyUserLockDown = props.getBoolean("proxy.user.lock.down", false);
+
}
public Props getGlobalProps() {
@@ -320,6 +324,7 @@ public class FlowRunnerManager implements EventListener {
// Setup flow runner
FlowRunner runner = new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
+ runner.setProxyUserLockDown(proxyUserLockDown);
runner.setGlobalProps(globalProps);
runner.addListener(this);
src/java/azkaban/execapp/JobRunner.java 10(+9 -1)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 641787d..ad03089 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -64,6 +64,8 @@ public class JobRunner extends EventHandler implements Runnable {
private final JobTypeManager jobtypeManager;
private List<String> proxyUsers = null;
+
+ private boolean userLockDown;
public JobRunner(ExecutableNode node, Props props, File workingDir, List<String> proxyUsers, ExecutorLoader loader, JobTypeManager jobtypeManager, Logger flowLogger) {
this.props = props;
@@ -76,6 +78,10 @@ public class JobRunner extends EventHandler implements Runnable {
this.proxyUsers = proxyUsers;
}
+ public void setUserLockDown (boolean doLockDown) {
+ this.userLockDown = doLockDown;
+ }
+
public ExecutableNode getNode() {
return node;
}
@@ -227,7 +233,9 @@ public class JobRunner extends EventHandler implements Runnable {
else {
if(! proxyUsers.contains(jobProxyUser)) {
logger.error("User " + jobProxyUser + " has no permission to execute this job " + node.getJobId() + "!");
- return false;
+ if(userLockDown) {
+ return false;
+ }
}
}
props.put("user.to.proxy", jobProxyUser);
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index f275119..51f469c 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -266,6 +266,9 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
else if (ajaxName.equals("retryFailedJobs")) {
ajaxRestartFailed(req, resp, ret, session.getUser(), exFlow);
}
+// else if (ajaxName.equals("fetchLatestJobStatus")) {
+// ajaxFetchLatestJobStatus(req, resp, ret, session.getUser(), exFlow);
+// }
else if (ajaxName.equals("flowInfo")) {
//String projectName = getParam(req, "project");
//Project project = projectManager.getProject(projectName);
@@ -297,6 +300,50 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
}
+// private void ajaxFetchLatestJobStatus(HttpServletRequest req,HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) {
+// Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
+// if (project == null) {
+// ret.put("error", "Project doesn't exist or incorrect access permission.");
+// return;
+// }
+//
+// String projectName;
+// String flowName;
+// String jobName;
+// try {
+// projectName = getParam(req, "projectName");
+// flowName = getParam(req, "flowName");
+// jobName = getParam(req, "jobName");
+// } catch (Exception e) {
+// ret.put("error", e.getMessage());
+// return;
+// }
+//
+// try {
+// ExecutableNode node = exFlow.getExecutableNode(jobId);
+// if (node == null) {
+// ret.put("error", "Job " + jobId + " doesn't exist in " + exFlow.getExecutionId());
+// return;
+// }
+//
+// int attempt = this.getIntParam(req, "attempt", node.getAttempt());
+// LogData data = executorManager.getExecutionJobLog(exFlow, jobId, offset, length, attempt);
+// if (data == null) {
+// ret.put("length", 0);
+// ret.put("offset", offset);
+// ret.put("data", "");
+// }
+// else {
+// ret.put("length", data.getLength());
+// ret.put("offset", data.getOffset());
+// ret.put("data", data.getData());
+// }
+// } catch (ExecutorManagerException e) {
+// throw new ServletException(e);
+// }
+//
+// }
+
private void ajaxRestartFailed(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.EXECUTE);
if (project == null) {