azkaban-uncached

add switch for proxy user lock down. if one doesn't want security,

2/27/2013 5:58:53 PM

Details

diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 31bae29..eebb427 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -84,6 +84,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 	
 	private List<String> proxyUsers = null;
 	
+	private boolean proxyUserLockDown = false;
+	
 	public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
 		this.execId = flow.getExecutionId();
 		this.flow = flow;
@@ -96,6 +98,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 		this.proxyUsers = getProxyUsers();
 	}
 
+	public void setProxyUserLockDown(boolean doLockDown) {
+		this.proxyUserLockDown = doLockDown;
+	}
+	
 	private List<String> getProxyUsers() {
 		List<String> allUsers = new ArrayList<String>();
 		allUsers.add(flow.getSubmitUser());
@@ -103,7 +109,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		try {
 			permissions = projectLoader.getProjectPermissions(flow.getProjectId());
 			for(Triple<String, Boolean, Permission> triple : permissions) {
-				if(triple.getSecond() == false && triple.getThird().isPermissionSet(Permission.Type.EXECUTE)) {
+				if(triple.getSecond() == false && (triple.getThird().isPermissionSet(Permission.Type.EXECUTE) || triple.getThird().isPermissionSet(Permission.Type.ADMIN) )) {
 					allUsers.add(triple.getFirst());
 				}
 			}
@@ -371,6 +377,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		
 		// should have one prop with system secrets, the other user level props
 		JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), proxyUsers, executorLoader, jobtypeManager, logger);
+		jobRunner.setUserLockDown(proxyUserLockDown);
 		jobRunner.addListener(listener);
 
 		return jobRunner;
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index b0da031..3b20d7d 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -82,6 +82,8 @@ public class FlowRunnerManager implements EventListener {
 	
 	private Object executionDirDeletionSync = new Object();
 	
+	private final boolean proxyUserLockDown;
+	
 	public FlowRunnerManager(Props props, ExecutorLoader executorLoader, ProjectLoader projectLoader, ClassLoader parentClassLoader) throws IOException {
 		executionDirectory = new File(props.getString("azkaban.execution.dir", "executions"));
 		projectDirectory = new File(props.getString("azkaban.project.dir", "projects"));
@@ -112,6 +114,8 @@ public class FlowRunnerManager implements EventListener {
 		
 		jobtypeManager = new JobTypeManager(props.getString(AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR, JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR), parentClassLoader);
 		
+		proxyUserLockDown = props.getBoolean("proxy.user.lock.down", false);
+		
 	}
 
 	public Props getGlobalProps() {
@@ -320,6 +324,7 @@ public class FlowRunnerManager implements EventListener {
 		
 		// Setup flow runner
 		FlowRunner runner = new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
+		runner.setProxyUserLockDown(proxyUserLockDown);
 		runner.setGlobalProps(globalProps);
 		runner.addListener(this);
 		
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 641787d..ad03089 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -64,6 +64,8 @@ public class JobRunner extends EventHandler implements Runnable {
 	
 	private final JobTypeManager jobtypeManager;
 	private List<String> proxyUsers = null;
+	
+	private boolean userLockDown;
 
 	public JobRunner(ExecutableNode node, Props props, File workingDir, List<String> proxyUsers, ExecutorLoader loader, JobTypeManager jobtypeManager, Logger flowLogger) {
 		this.props = props;
@@ -76,6 +78,10 @@ public class JobRunner extends EventHandler implements Runnable {
 		this.proxyUsers = proxyUsers;
 	}
 	
+	public void setUserLockDown (boolean doLockDown) {
+		this.userLockDown = doLockDown;
+	}
+	
 	public ExecutableNode getNode() {
 		return node;
 	}
@@ -227,7 +233,9 @@ public class JobRunner extends EventHandler implements Runnable {
 			else {
 				if(! proxyUsers.contains(jobProxyUser)) {
 					logger.error("User " + jobProxyUser + " has no permission to execute this job " + node.getJobId() + "!");
-					return false;
+					if(userLockDown) {
+						return false;
+					}
 				}
 			}
 			props.put("user.to.proxy", jobProxyUser);
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index f275119..51f469c 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -266,6 +266,9 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 				else if (ajaxName.equals("retryFailedJobs")) {
 					ajaxRestartFailed(req, resp, ret, session.getUser(), exFlow);
 				}
+//				else if (ajaxName.equals("fetchLatestJobStatus")) {
+//					ajaxFetchLatestJobStatus(req, resp, ret, session.getUser(), exFlow);
+//				}
 				else if (ajaxName.equals("flowInfo")) {
 					//String projectName = getParam(req, "project");
 					//Project project = projectManager.getProject(projectName);
@@ -297,6 +300,50 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		}
 	}
 
+//	private void ajaxFetchLatestJobStatus(HttpServletRequest req,HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) {
+//		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
+//		if (project == null) {
+//			ret.put("error", "Project doesn't exist or incorrect access permission.");
+//			return;
+//		}
+//		
+//		String projectName;
+//		String flowName;
+//		String jobName;
+//		try {
+//			projectName = getParam(req, "projectName");
+//			flowName = getParam(req, "flowName");
+//			jobName = getParam(req, "jobName");
+//		} catch (Exception e) {
+//			ret.put("error", e.getMessage());
+//			return;
+//		}
+//		
+//		try {
+//			ExecutableNode node = exFlow.getExecutableNode(jobId);
+//			if (node == null) {
+//				ret.put("error", "Job " + jobId + " doesn't exist in " + exFlow.getExecutionId());
+//				return;
+//			}
+//			
+//			int attempt = this.getIntParam(req, "attempt", node.getAttempt());
+//			LogData data = executorManager.getExecutionJobLog(exFlow, jobId, offset, length, attempt);
+//			if (data == null) {
+//				ret.put("length", 0);
+//				ret.put("offset", offset);
+//				ret.put("data", "");
+//			}
+//			else {
+//				ret.put("length", data.getLength());
+//				ret.put("offset", data.getOffset());
+//				ret.put("data", data.getData());
+//			}
+//		} catch (ExecutorManagerException e) {
+//			throw new ServletException(e);
+//		}
+//		
+//	}
+
 	private void ajaxRestartFailed(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
 		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.EXECUTE);
 		if (project == null) {