Details
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 9ae3178..e8326be 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -225,6 +225,8 @@ public class Constants {
public static final String MAX_XMS_DEFAULT = "1G";
public static final String JOB_MAX_XMX = "job.max.Xmx";
public static final String MAX_XMX_DEFAULT = "2G";
+ // The hadoop user the job should run under. If not specified, it will default to submit user.
+ public static final String USER_TO_PROXY = "user.to.proxy";
}
public static class JobCallbackProperties {
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 769ada2..357249a 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -21,6 +21,7 @@ import static azkaban.Constants.ConfigurationKeys.AZKABAN_SERVER_NATIVE_LIB_FOLD
import static azkaban.ServiceProvider.SERVICE_PROVIDER;
import azkaban.Constants;
+import azkaban.Constants.JobProperties;
import azkaban.flow.CommonJobProperties;
import azkaban.jobExecutor.utils.process.AzkabanProcess;
import azkaban.jobExecutor.utils.process.AzkabanProcessBuilder;
@@ -54,7 +55,6 @@ public class ProcessJob extends AbstractProcessJob {
@Deprecated
public static final String NATIVE_LIB_FOLDER = "azkaban.native.lib";
public static final String EXECUTE_AS_USER = "execute.as.user";
- public static final String USER_TO_PROXY = "user.to.proxy";
public static final String KRB5CCNAME = "KRB5CCNAME";
private static final Duration KILL_TIME = Duration.ofSeconds(30);
private static final String MEMCHECK_ENABLED = "memCheck.enabled";
@@ -354,8 +354,8 @@ public class ProcessJob extends AbstractProcessJob {
*/
private String getEffectiveUser(final Props jobProps) {
String effectiveUser = null;
- if (jobProps.containsKey(USER_TO_PROXY)) {
- effectiveUser = jobProps.getString(USER_TO_PROXY);
+ if (jobProps.containsKey(JobProperties.USER_TO_PROXY)) {
+ effectiveUser = jobProps.getString(JobProperties.USER_TO_PROXY);
} else if (jobProps.containsKey(CommonJobProperties.SUBMIT_USER)) {
effectiveUser = jobProps.getString(CommonJobProperties.SUBMIT_USER);
} else {
diff --git a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
index 8521a10..4508f0d 100644
--- a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
@@ -18,6 +18,7 @@ package azkaban.jobExecutor;
import static org.assertj.core.api.Assertions.assertThat;
+import azkaban.Constants.JobProperties;
import azkaban.flow.CommonJobProperties;
import azkaban.utils.Props;
import java.io.File;
@@ -83,7 +84,7 @@ public class ProcessJobTest {
// Initialize the Props
this.props.removeLocal(CommonJobProperties.SUBMIT_USER);
- this.props.put("user.to.proxy", "test_user");
+ this.props.put(JobProperties.USER_TO_PROXY, "test_user");
this.props.put(ProcessJob.COMMAND, "ls -al");
this.job.run();
@@ -112,7 +113,7 @@ public class ProcessJobTest {
// Initialize the Props
this.props.removeLocal(CommonJobProperties.SUBMIT_USER);
- this.props.put("user.to.proxy", "root");
+ this.props.put(JobProperties.USER_TO_PROXY, "root");
this.props.put("execute.as.user", "true");
this.props.put(ProcessJob.COMMAND, "ls -al");
@@ -128,7 +129,7 @@ public class ProcessJobTest {
// Initialize the Props
this.props.removeLocal(CommonJobProperties.SUBMIT_USER);
- this.props.put("user.to.proxy", "azkaban");
+ this.props.put(JobProperties.USER_TO_PROXY, "azkaban");
this.props.put("execute.as.user", "true");
this.props.put(ProcessJob.COMMAND, "ls -al");
@@ -184,7 +185,7 @@ public class ProcessJobTest {
final Props jobProps = new Props();
jobProps.put("command", "echo hello");
jobProps.put("working.dir", "/tmp");
- jobProps.put("user.to.proxy", "test");
+ jobProps.put(JobProperties.USER_TO_PROXY, "test");
jobProps.put("azkaban.flow.projectname", "test");
jobProps.put("azkaban.flow.flowid", "test");
jobProps.put("azkaban.job.id", "test");
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 1e9bd11..1e4ba8c 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -19,6 +19,7 @@ package azkaban.execapp;
import static azkaban.Constants.ConfigurationKeys.AZKABAN_SERVER_HOST_NAME;
import azkaban.Constants;
+import azkaban.Constants.JobProperties;
import azkaban.ServiceProvider;
import azkaban.event.Event;
import azkaban.event.EventData;
@@ -1241,7 +1242,7 @@ public class FlowRunner extends EventHandler implements Runnable {
metaData.put("jobType", String.valueOf(node.getType()));
metaData.put("azkabanHost", props.getString(AZKABAN_SERVER_HOST_NAME, "unknown"));
metaData.put("jobProxyUser",
- jobRunner.getProps().getString("user.to.proxy", null));
+ jobRunner.getProps().getString(JobProperties.USER_TO_PROXY, null));
return metaData;
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index aa5c856..adce0d1 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -17,6 +17,7 @@
package azkaban.execapp;
import azkaban.Constants;
+import azkaban.Constants.JobProperties;
import azkaban.event.Event;
import azkaban.event.EventData;
import azkaban.event.EventHandler;
@@ -669,13 +670,22 @@ public class JobRunner extends EventHandler implements Runnable {
this.props.put(AbstractProcessJob.WORKING_DIR, this.workingDir.getAbsolutePath());
}
- if (this.props.containsKey("user.to.proxy")) {
- final String jobProxyUser = this.props.getString("user.to.proxy");
+ if (this.props.containsKey(JobProperties.USER_TO_PROXY)) {
+ final String jobProxyUser = this.props.getString(JobProperties.USER_TO_PROXY);
if (this.proxyUsers != null && !this.proxyUsers.contains(jobProxyUser)) {
+ final String permissionsPageURL = getProjectPermissionsURL();
this.logger.error("User " + jobProxyUser
- + " has no permission to execute this job " + this.jobId + "!");
+ + " has no permission to execute this job " + this.jobId + "!"
+ + " If you want to execute this flow as " + jobProxyUser
+ + ", please add it to Proxy Users under project permissions page: " +
+ permissionsPageURL);
return null;
}
+ } else {
+ final String submitUser = this.getNode().getExecutableFlow().getSubmitUser();
+ this.props.put(JobProperties.USER_TO_PROXY, submitUser);
+ this.logger.info("user.to.proxy property was not set, defaulting to submit user " +
+ submitUser);
}
try {
@@ -690,6 +700,20 @@ public class JobRunner extends EventHandler implements Runnable {
}
/**
+ * Get project permissions page URL
+ */
+ private String getProjectPermissionsURL() {
+ String projectPermissionsURL = null;
+ final String baseURL = this.azkabanProps.get(AZKABAN_WEBSERVER_URL);
+ if (baseURL != null) {
+ final String projectName = this.node.getParentFlow().getProjectName();
+ projectPermissionsURL = String
+ .format("%s/manager?project=%s&permissions", baseURL, projectName);
+ }
+ return projectPermissionsURL;
+ }
+
+ /**
* Add useful JVM arguments so it is easier to map a running Java process to a flow, execution id
* and job
*/
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index f177c65..dcaa4ba 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -20,6 +20,7 @@ import static java.lang.Thread.State.TIMED_WAITING;
import static java.lang.Thread.State.WAITING;
import static org.assertj.core.api.Assertions.assertThat;
+import azkaban.Constants.JobProperties;
import azkaban.event.Event;
import azkaban.event.EventData;
import azkaban.executor.ExecutableFlow;
@@ -46,6 +47,7 @@ import org.junit.Test;
public class JobRunnerTest {
+ public static final String SUBMIT_USER = "testUser";
private final Logger logger = Logger.getLogger("JobRunnerTest");
private File workingDir;
private JobTypeManager jobtypeManager;
@@ -102,6 +104,8 @@ public class JobRunnerTest {
final Props outputProps = runner.getNode().getOutputProps();
Assert.assertTrue(outputProps != null);
Assert.assertTrue(logFile.exists());
+ // Verify that user.to.proxy is default to submit user.
+ Assert.assertEquals(SUBMIT_USER, runner.getProps().get(JobProperties.USER_TO_PROXY));
Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
@@ -333,6 +337,7 @@ public class JobRunnerTest {
final Props azkabanProps = new Props();
final ExecutableFlow flow = new ExecutableFlow();
flow.setExecutionId(execId);
+ flow.setSubmitUser(SUBMIT_USER);
final ExecutableNode node = new ExecutableNode();
node.setId(name);
node.setParentFlow(flow);
diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/HadoopSecurityManager.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/HadoopSecurityManager.java
index 68b7475..226b1d0 100644
--- a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/HadoopSecurityManager.java
+++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/HadoopSecurityManager.java
@@ -30,7 +30,6 @@ public abstract class HadoopSecurityManager {
public static final String PROXY_KEYTAB_LOCATION = "proxy.keytab.location";
public static final String PROXY_USER = "proxy.user";
- public static final String USER_TO_PROXY = "user.to.proxy";
public static final String OBTAIN_BINARY_TOKEN = "obtain.binary.token";
public static final String MAPREDUCE_JOB_CREDENTIALS_BINARY =
"mapreduce.job.credentials.binary";
diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/SecurityUtils.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/SecurityUtils.java
index 0c90892..86fc949 100644
--- a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/SecurityUtils.java
+++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/SecurityUtils.java
@@ -16,6 +16,7 @@
package azkaban.security.commons;
+import azkaban.Constants.JobProperties;
import azkaban.utils.Props;
import java.io.DataOutputStream;
import java.io.File;
@@ -40,7 +41,6 @@ public class SecurityUtils {
public static final String ENABLE_PROXYING = "azkaban.should.proxy"; // boolean
public static final String PROXY_KEYTAB_LOCATION = "proxy.keytab.location";
public static final String PROXY_USER = "proxy.user";
- public static final String TO_PROXY = "user.to.proxy";
public static final String OBTAIN_BINARY_TOKEN = "obtain.binary.token";
public static final String MAPREDUCE_JOB_CREDENTIALS_BINARY =
"mapreduce.job.credentials.binary";
@@ -84,7 +84,7 @@ public class SecurityUtils {
*/
public static UserGroupInformation getProxiedUser(final Properties prop,
final Logger log, final Configuration conf) throws IOException {
- final String toProxy = verifySecureProperty(prop, TO_PROXY, log);
+ final String toProxy = verifySecureProperty(prop, JobProperties.USER_TO_PROXY, log);
final UserGroupInformation user = getProxiedUser(toProxy, prop, log, conf);
if (user == null) {
throw new IOException(
@@ -118,7 +118,7 @@ public class SecurityUtils {
IOException {
final Configuration conf = new Configuration();
- logger.info("Getting proxy user for " + p.getString(TO_PROXY));
+ logger.info("Getting proxy user for " + p.getString(JobProperties.USER_TO_PROXY));
logger.info("Getting proxy user for " + p.toString());
getProxiedUser(p.toProperties(), logger, conf).doAs(
@@ -138,7 +138,7 @@ public class SecurityUtils {
logger.info("Pre-fetching fs token");
final FileSystem fs = FileSystem.get(conf);
final Token<?> fsToken =
- fs.getDelegationToken(p.getString("user.to.proxy"));
+ fs.getDelegationToken(p.getString(JobProperties.USER_TO_PROXY));
logger.info("Created token: " + fsToken.toString());
final Job job =
diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
index c7defdc..a8b6a8f 100644
--- a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
+++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
@@ -286,7 +286,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
@Override
public UserGroupInformation getProxiedUser(final Props userProp)
throws HadoopSecurityManagerException {
- final String userToProxy = verifySecureProperty(userProp, USER_TO_PROXY);
+ final String userToProxy = verifySecureProperty(userProp, JobProperties.USER_TO_PROXY);
final UserGroupInformation user = getProxiedUser(userToProxy);
if (user == null) {
throw new HadoopSecurityManagerException(
@@ -528,7 +528,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
final Props props, final Logger logger)
throws HadoopSecurityManagerException {
- final String userToProxy = props.getString(USER_TO_PROXY);
+ final String userToProxy = props.getString(JobProperties.USER_TO_PROXY);
logger.info("Getting hadoop tokens based on props for " + userToProxy);