azkaban-developers

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 9ae3178..e8326be 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -225,6 +225,8 @@ public class Constants {
     public static final String MAX_XMS_DEFAULT = "1G";
     public static final String JOB_MAX_XMX = "job.max.Xmx";
     public static final String MAX_XMX_DEFAULT = "2G";
+    // The hadoop user the job should run under. If not specified, it will default to submit user.
+    public static final String USER_TO_PROXY = "user.to.proxy";
   }
 
   public static class JobCallbackProperties {
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 769ada2..357249a 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -21,6 +21,7 @@ import static azkaban.Constants.ConfigurationKeys.AZKABAN_SERVER_NATIVE_LIB_FOLD
 import static azkaban.ServiceProvider.SERVICE_PROVIDER;
 
 import azkaban.Constants;
+import azkaban.Constants.JobProperties;
 import azkaban.flow.CommonJobProperties;
 import azkaban.jobExecutor.utils.process.AzkabanProcess;
 import azkaban.jobExecutor.utils.process.AzkabanProcessBuilder;
@@ -54,7 +55,6 @@ public class ProcessJob extends AbstractProcessJob {
   @Deprecated
   public static final String NATIVE_LIB_FOLDER = "azkaban.native.lib";
   public static final String EXECUTE_AS_USER = "execute.as.user";
-  public static final String USER_TO_PROXY = "user.to.proxy";
   public static final String KRB5CCNAME = "KRB5CCNAME";
   private static final Duration KILL_TIME = Duration.ofSeconds(30);
   private static final String MEMCHECK_ENABLED = "memCheck.enabled";
@@ -354,8 +354,8 @@ public class ProcessJob extends AbstractProcessJob {
    */
   private String getEffectiveUser(final Props jobProps) {
     String effectiveUser = null;
-    if (jobProps.containsKey(USER_TO_PROXY)) {
-      effectiveUser = jobProps.getString(USER_TO_PROXY);
+    if (jobProps.containsKey(JobProperties.USER_TO_PROXY)) {
+      effectiveUser = jobProps.getString(JobProperties.USER_TO_PROXY);
     } else if (jobProps.containsKey(CommonJobProperties.SUBMIT_USER)) {
       effectiveUser = jobProps.getString(CommonJobProperties.SUBMIT_USER);
     } else {
diff --git a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
index 8521a10..4508f0d 100644
--- a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
@@ -18,6 +18,7 @@ package azkaban.jobExecutor;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import azkaban.Constants.JobProperties;
 import azkaban.flow.CommonJobProperties;
 import azkaban.utils.Props;
 import java.io.File;
@@ -83,7 +84,7 @@ public class ProcessJobTest {
 
     // Initialize the Props
     this.props.removeLocal(CommonJobProperties.SUBMIT_USER);
-    this.props.put("user.to.proxy", "test_user");
+    this.props.put(JobProperties.USER_TO_PROXY, "test_user");
     this.props.put(ProcessJob.COMMAND, "ls -al");
 
     this.job.run();
@@ -112,7 +113,7 @@ public class ProcessJobTest {
 
     // Initialize the Props
     this.props.removeLocal(CommonJobProperties.SUBMIT_USER);
-    this.props.put("user.to.proxy", "root");
+    this.props.put(JobProperties.USER_TO_PROXY, "root");
     this.props.put("execute.as.user", "true");
     this.props.put(ProcessJob.COMMAND, "ls -al");
 
@@ -128,7 +129,7 @@ public class ProcessJobTest {
 
     // Initialize the Props
     this.props.removeLocal(CommonJobProperties.SUBMIT_USER);
-    this.props.put("user.to.proxy", "azkaban");
+    this.props.put(JobProperties.USER_TO_PROXY, "azkaban");
     this.props.put("execute.as.user", "true");
     this.props.put(ProcessJob.COMMAND, "ls -al");
 
@@ -184,7 +185,7 @@ public class ProcessJobTest {
     final Props jobProps = new Props();
     jobProps.put("command", "echo hello");
     jobProps.put("working.dir", "/tmp");
-    jobProps.put("user.to.proxy", "test");
+    jobProps.put(JobProperties.USER_TO_PROXY, "test");
     jobProps.put("azkaban.flow.projectname", "test");
     jobProps.put("azkaban.flow.flowid", "test");
     jobProps.put("azkaban.job.id", "test");
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 1e9bd11..1e4ba8c 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -19,6 +19,7 @@ package azkaban.execapp;
 import static azkaban.Constants.ConfigurationKeys.AZKABAN_SERVER_HOST_NAME;
 
 import azkaban.Constants;
+import azkaban.Constants.JobProperties;
 import azkaban.ServiceProvider;
 import azkaban.event.Event;
 import azkaban.event.EventData;
@@ -1241,7 +1242,7 @@ public class FlowRunner extends EventHandler implements Runnable {
       metaData.put("jobType", String.valueOf(node.getType()));
       metaData.put("azkabanHost", props.getString(AZKABAN_SERVER_HOST_NAME, "unknown"));
       metaData.put("jobProxyUser",
-          jobRunner.getProps().getString("user.to.proxy", null));
+          jobRunner.getProps().getString(JobProperties.USER_TO_PROXY, null));
       return metaData;
     }
 
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index aa5c856..adce0d1 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -17,6 +17,7 @@
 package azkaban.execapp;
 
 import azkaban.Constants;
+import azkaban.Constants.JobProperties;
 import azkaban.event.Event;
 import azkaban.event.EventData;
 import azkaban.event.EventHandler;
@@ -669,13 +670,22 @@ public class JobRunner extends EventHandler implements Runnable {
         this.props.put(AbstractProcessJob.WORKING_DIR, this.workingDir.getAbsolutePath());
       }
 
-      if (this.props.containsKey("user.to.proxy")) {
-        final String jobProxyUser = this.props.getString("user.to.proxy");
+      if (this.props.containsKey(JobProperties.USER_TO_PROXY)) {
+        final String jobProxyUser = this.props.getString(JobProperties.USER_TO_PROXY);
         if (this.proxyUsers != null && !this.proxyUsers.contains(jobProxyUser)) {
+          final String permissionsPageURL = getProjectPermissionsURL();
           this.logger.error("User " + jobProxyUser
-              + " has no permission to execute this job " + this.jobId + "!");
+              + " has no permission to execute this job " + this.jobId + "!"
+              + " If you want to execute this flow as " + jobProxyUser
+              + ", please add it to Proxy Users under project permissions page: " +
+              permissionsPageURL);
           return null;
         }
+      } else {
+        final String submitUser = this.getNode().getExecutableFlow().getSubmitUser();
+        this.props.put(JobProperties.USER_TO_PROXY, submitUser);
+        this.logger.info("user.to.proxy property was not set, defaulting to submit user " +
+            submitUser);
       }
 
       try {
@@ -690,6 +700,20 @@ public class JobRunner extends EventHandler implements Runnable {
   }
 
   /**
+   * Get project permissions page URL
+   */
+  private String getProjectPermissionsURL() {
+    String projectPermissionsURL = null;
+    final String baseURL = this.azkabanProps.get(AZKABAN_WEBSERVER_URL);
+    if (baseURL != null) {
+      final String projectName = this.node.getParentFlow().getProjectName();
+      projectPermissionsURL = String
+          .format("%s/manager?project=%s&permissions", baseURL, projectName);
+    }
+    return projectPermissionsURL;
+  }
+
+  /**
    * Add useful JVM arguments so it is easier to map a running Java process to a flow, execution id
    * and job
    */
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index f177c65..dcaa4ba 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -20,6 +20,7 @@ import static java.lang.Thread.State.TIMED_WAITING;
 import static java.lang.Thread.State.WAITING;
 import static org.assertj.core.api.Assertions.assertThat;
 
+import azkaban.Constants.JobProperties;
 import azkaban.event.Event;
 import azkaban.event.EventData;
 import azkaban.executor.ExecutableFlow;
@@ -46,6 +47,7 @@ import org.junit.Test;
 
 public class JobRunnerTest {
 
+  public static final String SUBMIT_USER = "testUser";
   private final Logger logger = Logger.getLogger("JobRunnerTest");
   private File workingDir;
   private JobTypeManager jobtypeManager;
@@ -102,6 +104,8 @@ public class JobRunnerTest {
     final Props outputProps = runner.getNode().getOutputProps();
     Assert.assertTrue(outputProps != null);
     Assert.assertTrue(logFile.exists());
+    // Verify that user.to.proxy is default to submit user.
+    Assert.assertEquals(SUBMIT_USER, runner.getProps().get(JobProperties.USER_TO_PROXY));
 
     Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
 
@@ -333,6 +337,7 @@ public class JobRunnerTest {
     final Props azkabanProps = new Props();
     final ExecutableFlow flow = new ExecutableFlow();
     flow.setExecutionId(execId);
+    flow.setSubmitUser(SUBMIT_USER);
     final ExecutableNode node = new ExecutableNode();
     node.setId(name);
     node.setParentFlow(flow);
diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/HadoopSecurityManager.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/HadoopSecurityManager.java
index 68b7475..226b1d0 100644
--- a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/HadoopSecurityManager.java
+++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/HadoopSecurityManager.java
@@ -30,7 +30,6 @@ public abstract class HadoopSecurityManager {
 
   public static final String PROXY_KEYTAB_LOCATION = "proxy.keytab.location";
   public static final String PROXY_USER = "proxy.user";
-  public static final String USER_TO_PROXY = "user.to.proxy";
   public static final String OBTAIN_BINARY_TOKEN = "obtain.binary.token";
   public static final String MAPREDUCE_JOB_CREDENTIALS_BINARY =
       "mapreduce.job.credentials.binary";
diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/SecurityUtils.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/SecurityUtils.java
index 0c90892..86fc949 100644
--- a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/SecurityUtils.java
+++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/SecurityUtils.java
@@ -16,6 +16,7 @@
 
 package azkaban.security.commons;
 
+import azkaban.Constants.JobProperties;
 import azkaban.utils.Props;
 import java.io.DataOutputStream;
 import java.io.File;
@@ -40,7 +41,6 @@ public class SecurityUtils {
   public static final String ENABLE_PROXYING = "azkaban.should.proxy"; // boolean
   public static final String PROXY_KEYTAB_LOCATION = "proxy.keytab.location";
   public static final String PROXY_USER = "proxy.user";
-  public static final String TO_PROXY = "user.to.proxy";
   public static final String OBTAIN_BINARY_TOKEN = "obtain.binary.token";
   public static final String MAPREDUCE_JOB_CREDENTIALS_BINARY =
       "mapreduce.job.credentials.binary";
@@ -84,7 +84,7 @@ public class SecurityUtils {
    */
   public static UserGroupInformation getProxiedUser(final Properties prop,
       final Logger log, final Configuration conf) throws IOException {
-    final String toProxy = verifySecureProperty(prop, TO_PROXY, log);
+    final String toProxy = verifySecureProperty(prop, JobProperties.USER_TO_PROXY, log);
     final UserGroupInformation user = getProxiedUser(toProxy, prop, log, conf);
     if (user == null) {
       throw new IOException(
@@ -118,7 +118,7 @@ public class SecurityUtils {
       IOException {
 
     final Configuration conf = new Configuration();
-    logger.info("Getting proxy user for " + p.getString(TO_PROXY));
+    logger.info("Getting proxy user for " + p.getString(JobProperties.USER_TO_PROXY));
     logger.info("Getting proxy user for " + p.toString());
 
     getProxiedUser(p.toProperties(), logger, conf).doAs(
@@ -138,7 +138,7 @@ public class SecurityUtils {
               logger.info("Pre-fetching fs token");
               final FileSystem fs = FileSystem.get(conf);
               final Token<?> fsToken =
-                  fs.getDelegationToken(p.getString("user.to.proxy"));
+                  fs.getDelegationToken(p.getString(JobProperties.USER_TO_PROXY));
               logger.info("Created token: " + fsToken.toString());
 
               final Job job =
diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
index c7defdc..a8b6a8f 100644
--- a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
+++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
@@ -286,7 +286,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
   @Override
   public UserGroupInformation getProxiedUser(final Props userProp)
       throws HadoopSecurityManagerException {
-    final String userToProxy = verifySecureProperty(userProp, USER_TO_PROXY);
+    final String userToProxy = verifySecureProperty(userProp, JobProperties.USER_TO_PROXY);
     final UserGroupInformation user = getProxiedUser(userToProxy);
     if (user == null) {
       throw new HadoopSecurityManagerException(
@@ -528,7 +528,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
       final Props props, final Logger logger)
       throws HadoopSecurityManagerException {
 
-    final String userToProxy = props.getString(USER_TO_PROXY);
+    final String userToProxy = props.getString(JobProperties.USER_TO_PROXY);
 
     logger.info("Getting hadoop tokens based on props for " + userToProxy);