azkaban-uncached
Changes
src/java/azkaban/execapp/FlowRunner.java 16(+1 -15)
Details
src/java/azkaban/execapp/FlowRunner.java 16(+1 -15)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index fbbedc0..74bff38 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -97,26 +97,12 @@ public class FlowRunner extends EventHandler implements Runnable {
this.execDir = new File(flow.getExecutionPath());
this.jobtypeManager = jobtypeManager;
- this.proxyUsers = getProxyUsers();
+ this.proxyUsers = flow.getProxyUsers();
}
public void setProxyUserLockDown(boolean doLockDown) {
this.proxyUserLockDown = doLockDown;
}
-
- private HashSet<String> getProxyUsers() {
- HashSet<String> proxyUsers = null;
-
- try {
- Project project = projectLoader.fetchProjectById(flow.getProjectId());
- proxyUsers = project.getProxyUsers();
- } catch (ProjectManagerException e) {
- // This gets funny when no user specified and submitted by the scheduler
- logger.error("Failed to get project permission from project. Using default permission.", e);
- }
-
- return proxyUsers;
- }
public FlowRunner setGlobalProps(Props globalProps) {
this.globalProps = globalProps;
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index ce88cb8..3892987 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -59,6 +59,8 @@ public class ExecutableFlow {
private Integer pipelineLevel = null;
private Map<String, String> flowParameters = new HashMap<String, String>();
+
+ private HashSet<String> proxyUsers = new HashSet<String>();
public enum FailureAction {
FINISH_CURRENTLY_RUNNING,
@@ -159,6 +161,14 @@ public class ExecutableFlow {
return flowParameters;
}
+ public void setProxyUsers(HashSet<String> proxyUsers) {
+ this.proxyUsers = proxyUsers;
+ }
+
+ public HashSet<String> getProxyUsers() {
+ return this.proxyUsers;
+ }
+
private void setFlow(Flow flow) {
for (Node node: flow.getNodes()) {
String id = node.getId();
@@ -351,6 +361,10 @@ public class ExecutableFlow {
nodes.add(node.toObject());
}
flowObj.put("nodes", nodes);
+
+ ArrayList<String> proxyUserList = new ArrayList<String>(proxyUsers);
+
+ flowObj.put("proxyUsers", proxyUserList);
return flowObj;
}
@@ -487,6 +501,11 @@ public class ExecutableFlow {
// Failure emails
exFlow.setFailureEmails((List<String>)flowObj.get("failureEmails"));
+ if(flowObj.containsKey("proxyUsers")) {
+ ArrayList<String> proxyUserList = (ArrayList<String>) flowObj.get("proxyUsers");
+ exFlow.setProxyUsers(new HashSet<String>(proxyUserList));
+ }
+
return exFlow;
}
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 9a53f37..bf27ac7 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -379,6 +379,7 @@ public class ScheduleManager {
// Create ExecutableFlow
ExecutableFlow exflow = new ExecutableFlow(flow);
exflow.setSubmitUser(runningSched.getSubmitUser());
+ exflow.setProxyUsers(project.getProxyUsers());
FlowOptions flowOptions = runningSched.getFlowOptions();
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 51f469c..56efea4 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -683,6 +683,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ExecutableFlow exflow = new ExecutableFlow(flow);
exflow.setSubmitUser(user.getUserId());
+ exflow.setProxyUsers(project.getProxyUsers());
if (hasParam(req, "failureAction")) {
String option = getParam(req, "failureAction");