azkaban-uncached

put proxy user info in exflow

3/5/2013 12:48:56 AM

Details

diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index fbbedc0..74bff38 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -97,26 +97,12 @@ public class FlowRunner extends EventHandler implements Runnable {
 		this.execDir = new File(flow.getExecutionPath());
 		this.jobtypeManager = jobtypeManager;
 		
-		this.proxyUsers = getProxyUsers();
+		this.proxyUsers = flow.getProxyUsers();
 	}
 
 	public void setProxyUserLockDown(boolean doLockDown) {
 		this.proxyUserLockDown = doLockDown;
 	}
-	
-	private HashSet<String> getProxyUsers() {
-		HashSet<String> proxyUsers = null;
-
-		try {
-			Project project = projectLoader.fetchProjectById(flow.getProjectId());
-			proxyUsers = project.getProxyUsers();
-		} catch (ProjectManagerException e) {
-			// This gets funny when no user specified and submitted by the scheduler
-			logger.error("Failed to get project permission from project. Using default permission.", e);
-		}
-		
-		return proxyUsers;
-	}
 
 	public FlowRunner setGlobalProps(Props globalProps) {
 		this.globalProps = globalProps;
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index ce88cb8..3892987 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -59,6 +59,8 @@ public class ExecutableFlow {
 	
 	private Integer pipelineLevel = null;
 	private Map<String, String> flowParameters = new HashMap<String, String>();
+	
+	private HashSet<String> proxyUsers = new HashSet<String>();
 
 	public enum FailureAction {
 		FINISH_CURRENTLY_RUNNING,
@@ -159,6 +161,14 @@ public class ExecutableFlow {
 		return flowParameters;
 	}
 	
+	public void setProxyUsers(HashSet<String> proxyUsers) {
+		this.proxyUsers = proxyUsers;
+	}
+	
+	public HashSet<String> getProxyUsers() {
+		return this.proxyUsers;
+	}
+	
 	private void setFlow(Flow flow) {
 		for (Node node: flow.getNodes()) {
 			String id = node.getId();
@@ -351,6 +361,10 @@ public class ExecutableFlow {
 			nodes.add(node.toObject());
 		}
 		flowObj.put("nodes", nodes);
+		
+		ArrayList<String> proxyUserList = new ArrayList<String>(proxyUsers);
+		
+		flowObj.put("proxyUsers", proxyUserList);
 
 		return flowObj;
 	}
@@ -487,6 +501,11 @@ public class ExecutableFlow {
 		// Failure emails
 		exFlow.setFailureEmails((List<String>)flowObj.get("failureEmails"));
 		
+		if(flowObj.containsKey("proxyUsers")) {
+			ArrayList<String> proxyUserList = (ArrayList<String>) flowObj.get("proxyUsers");
+			exFlow.setProxyUsers(new HashSet<String>(proxyUserList));
+		}
+		
 		return exFlow;
 	}
 	
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 9a53f37..bf27ac7 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -379,6 +379,7 @@ public class ScheduleManager {
 										// Create ExecutableFlow
 										ExecutableFlow exflow = new ExecutableFlow(flow);
 										exflow.setSubmitUser(runningSched.getSubmitUser());
+										exflow.setProxyUsers(project.getProxyUsers());
 										
 										FlowOptions flowOptions = runningSched.getFlowOptions();
 										
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 51f469c..56efea4 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -683,6 +683,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		
 		ExecutableFlow exflow = new ExecutableFlow(flow);
 		exflow.setSubmitUser(user.getUserId());
+		exflow.setProxyUsers(project.getProxyUsers());
 
 		if (hasParam(req, "failureAction")) {
 			String option = getParam(req, "failureAction");