azkaban-aplcache

Merge pull request #567 from johnyu0520/master Merging

11/16/2015 11:41:54 PM

Details

diff --git a/azkaban-common/src/main/c/execute-as-user.c b/azkaban-common/src/main/c/execute-as-user.c
new file mode 100644
index 0000000..f27d5a1
--- /dev/null
+++ b/azkaban-common/src/main/c/execute-as-user.c
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+#include <dirent.h>
+#include <fcntl.h>
+#include <fts.h>
+#include <errno.h>
+#include <grp.h>
+#include <unistd.h>
+#include <signal.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <limits.h>
+#include <sys/stat.h>
+#include <sys/mount.h>
+#include <sys/types.h>
+#include <pwd.h>
+
+FILE *LOGFILE = NULL;
+FILE *ERRORFILE = NULL;
+int SETUID_OPER_FAILED = 10;
+int USER_NOT_FOUND = 20;
+int INVALID_INPUT = 30;
+
+/*
+ *  Change the real and effective user and group from super user to the specified user
+ *  
+ *  Adopted from:
+ *  ./hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
+ *  
+ */
+
+int change_user(uid_t user, gid_t group) {
+    if (user == getuid() && user == geteuid() &&
+            group == getgid() && group == getegid()) {
+        return 0;
+    }
+
+    if (seteuid(0) != 0) {
+        fprintf(LOGFILE, "unable to reacquire root - %s\n", strerror(errno));
+        fprintf(LOGFILE, "Real: %d:%d; Effective: %d:%d\n",
+                getuid(), getgid(), geteuid(), getegid());
+        return SETUID_OPER_FAILED;
+    }
+    if (setgid(group) != 0) {
+        fprintf(LOGFILE, "unable to set group to %d - %s\n", group,
+                strerror(errno));
+        fprintf(LOGFILE, "Real: %d:%d; Effective: %d:%d\n",
+                getuid(), getgid(), geteuid(), getegid());
+        return SETUID_OPER_FAILED;
+    }
+    if (setuid(user) != 0) {
+        fprintf(LOGFILE, "unable to set user to %d - %s\n", user, strerror(errno));
+        fprintf(LOGFILE, "Real: %d:%d; Effective: %d:%d\n",
+                getuid(), getgid(), geteuid(), getegid());
+        return SETUID_OPER_FAILED;
+    }
+
+    return 0;
+}
+
+int main(int argc, char **argv){
+
+    // set up the logging stream
+    if (!LOGFILE){
+        LOGFILE=stdout;
+    }
+    if (!ERRORFILE){
+        ERRORFILE=stderr;
+    }
+
+    if (argc < 3) {
+        fprintf(ERRORFILE, "Requires at least 3 variables: ./execute-as-user uid command [args]");
+        return INVALID_INPUT;
+    }
+
+    char *uid = argv[1];
+
+    // gather information about user
+    struct passwd *user_info = getpwnam(uid);
+    if (user_info == NULL){
+        fprintf(LOGFILE, "user does not exist: %s", uid);
+        return USER_NOT_FOUND;
+    }
+
+    // try to change user
+    fprintf(LOGFILE, "Changing user: user: %s, uid: %d, gid: %d\n", uid, user_info->pw_uid, user_info->pw_gid);
+    int retval = change_user(user_info->pw_uid, user_info->pw_gid);
+    if (retval != 0){
+        fprintf(LOGFILE, "Error changing user to %s\n", uid);
+        return SETUID_OPER_FAILED;
+    }
+
+    // execute the command
+    char **user_argv = &argv[2];
+    fprintf(LOGFILE, "user command starting from: %s\n", user_argv[0]);
+    fflush(LOGFILE);
+    retval = execvp(*user_argv, user_argv);
+    fprintf(LOGFILE, "system call return value: %d\n", retval);
+
+    // sometimes system(cmd) returns 256, which is interpreted to 0, making a failed job a successful job
+    // hence this goofy piece of if statement.
+    if (retval != 0){
+        return 1;
+    }
+    else{
+        return 0;
+    }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 3ba6d86..a6e4f30 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -49,8 +49,11 @@ public class ProcessJob extends AbstractProcessJob {
 
   public static final String AZKABAN_MEMORY_CHECK = "azkaban.memory.check";
 
+  public static final String NATIVE_LIB_FOLDER = "azkaban.native.lib";
+  public static final String EXECUTE_AS_USER = "execute.as.user";
+  public static final String EXECUTE_AS_USER_OVERRIDE =
+      "execute.as.user.override";
   public static final String USER_TO_PROXY = "user.to.proxy";
-
   public static final String KRB5CCNAME = "KRB5CCNAME";
 
   public ProcessJob(final String jobId, final Props sysProps,
@@ -101,16 +104,45 @@ public class ProcessJob extends AbstractProcessJob {
 
     info(commands.size() + " commands to execute.");
     File[] propFiles = initPropsFiles();
-    Map<String, String> envVars = getEnvironmentVariables();
 
     // change krb5ccname env var so that each job execution gets its own cache
+    Map<String, String> envVars = getEnvironmentVariables();
     envVars.put(KRB5CCNAME, getKrb5ccname(jobProps));
 
+    // determine whether to run as Azkaban or run as effectiveUser
+    String executeAsUserBinaryPath = null;
+    String effectiveUser = null;
+    boolean isExecuteAsUser = determineExecuteAsUser(sysProps, jobProps);
+
+    if (isExecuteAsUser) {
+      String nativeLibFolder = sysProps.getString(NATIVE_LIB_FOLDER);
+      executeAsUserBinaryPath =
+          String.format("%s/%s", nativeLibFolder, "execute-as-user");
+      effectiveUser = getEffectiveUser(jobProps);
+      if ("root".equals(effectiveUser)) {
+        throw new RuntimeException(
+            "Not permitted to proxy as root through Azkaban");
+      }
+    }
+
     for (String command : commands) {
-      info("Command: " + command);
-      AzkabanProcessBuilder builder =
-          new AzkabanProcessBuilder(partitionCommandLine(command))
-              .setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog());
+      AzkabanProcessBuilder builder = null;
+      if (isExecuteAsUser) {
+        command =
+            String.format("%s %s %s", executeAsUserBinaryPath, effectiveUser,
+                command);
+        info("Command: " + command);
+        builder =
+            new AzkabanProcessBuilder(partitionCommandLine(command))
+                .setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog())
+                .enableExecuteAsUser().setExecuteAsUserBinaryPath(executeAsUserBinaryPath)
+                .setEffectiveUser(effectiveUser);
+      } else {
+        info("Command: " + command);
+        builder =
+            new AzkabanProcessBuilder(partitionCommandLine(command))
+                .setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog());
+      }
 
       if (builder.getEnv().size() > 0) {
         info("Environment variables: " + builder.getEnv());
@@ -143,6 +175,15 @@ public class ProcessJob extends AbstractProcessJob {
     generateProperties(propFiles[1]);
   }
 
+  private boolean determineExecuteAsUser(Props sysProps, Props jobProps) {
+    boolean isExecuteAsUser = sysProps.getBoolean(EXECUTE_AS_USER, false);
+    // putting an override in case user needs to override. A temporary opening
+    if (jobProps.containsKey(EXECUTE_AS_USER_OVERRIDE))
+      isExecuteAsUser = jobProps.getBoolean(EXECUTE_AS_USER_OVERRIDE, false);
+
+    return isExecuteAsUser;
+  }
+
   /**
    * <pre>
    * This method extracts the kerberos ticket cache file name from the jobprops.
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
index 9c3f092..9ad88d3 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
@@ -40,6 +40,9 @@ import com.google.common.base.Joiner;
  * loggers.
  */
 public class AzkabanProcess {
+  
+  public static String KILL_COMMAND = "kill";
+  
   private final String workingDir;
   private final List<String> cmd;
   private final Map<String, String> env;
@@ -50,6 +53,10 @@ public class AzkabanProcess {
   private volatile int processId;
   private volatile Process process;
 
+  private boolean isExecuteAsUser = false;
+  private String executeAsUserBinary = null;
+  private String effectiveUser = null;
+
   public AzkabanProcess(final List<String> cmd, final Map<String, String> env,
       final String workingDir, final Logger logger) {
     this.cmd = cmd;
@@ -61,6 +68,15 @@ public class AzkabanProcess {
     this.logger = logger;
   }
 
+  public AzkabanProcess(List<String> cmd, Map<String, String> env,
+      String workingDir, Logger logger, String executeAsUserBinary,
+      String effectiveUser) {
+    this(cmd, env, workingDir, logger);
+    this.isExecuteAsUser = true;
+    this.executeAsUserBinary = executeAsUserBinary;
+    this.effectiveUser = effectiveUser;
+  }
+
   /**
    * Execute this process, blocking until it has completed.
    */
@@ -101,13 +117,20 @@ public class AzkabanProcess {
       }
 
       completeLatch.countDown();
-      if (exitCode != 0) {
-        throw new ProcessFailureException(exitCode, errorGobbler.getRecentLog());
-      }
 
       // try to wait for everything to get logged out before exiting
       outputGobbler.awaitCompletion(5000);
       errorGobbler.awaitCompletion(5000);
+
+      if (exitCode != 0) {
+        String output =
+            new StringBuilder().append("Stdout:\n")
+                .append(outputGobbler.getRecentLog()).append("\n\n")
+                .append("Stderr:\n").append(errorGobbler.getRecentLog())
+                .append("\n").toString();
+        throw new ProcessFailureException(exitCode, output);
+      }
+
     } finally {
       IOUtils.closeQuietly(process.getInputStream());
       IOUtils.closeQuietly(process.getOutputStream());
@@ -155,7 +178,15 @@ public class AzkabanProcess {
     checkStarted();
     if (processId != 0 && isStarted()) {
       try {
-        Runtime.getRuntime().exec("kill " + processId);
+        if (isExecuteAsUser) {
+          String cmd =
+              String.format("%s %s %s %d", executeAsUserBinary,
+                  effectiveUser, KILL_COMMAND, processId);
+          Runtime.getRuntime().exec(cmd);
+        } else {
+          String cmd = String.format("%s %d", KILL_COMMAND, processId);
+          Runtime.getRuntime().exec(cmd);
+        }
         return completeLatch.await(time, unit);
       } catch (IOException e) {
         logger.error("Kill attempt failed.", e);
@@ -173,7 +204,15 @@ public class AzkabanProcess {
     if (isRunning()) {
       if (processId != 0) {
         try {
-          Runtime.getRuntime().exec("kill -9 " + processId);
+          if (isExecuteAsUser) {
+            String cmd =
+                String.format("%s %s %s -9 %d", executeAsUserBinary,
+                    effectiveUser, KILL_COMMAND, processId);
+            Runtime.getRuntime().exec(cmd);
+          } else {
+            String cmd = String.format("%s -9 %d", KILL_COMMAND, processId);
+            Runtime.getRuntime().exec(cmd);
+          }
         } catch (IOException e) {
           logger.error("Kill attempt failed.", e);
         }
@@ -234,4 +273,12 @@ public class AzkabanProcess {
     return "Process(cmd = " + Joiner.on(" ").join(cmd) + ", env = " + env
         + ", cwd = " + workingDir + ")";
   }
+
+  public boolean isExecuteAsUser() {
+    return isExecuteAsUser;
+  }
+
+  public String getEffectiveUser() {
+    return effectiveUser;
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcessBuilder.java b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcessBuilder.java
index 8832195..9e2c2f7 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcessBuilder.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcessBuilder.java
@@ -35,6 +35,9 @@ public class AzkabanProcessBuilder {
   private Map<String, String> env = new HashMap<String, String>();
   private String workingDir = System.getProperty("user.dir");
   private Logger logger = Logger.getLogger(AzkabanProcess.class);
+  private boolean isExecuteAsUser = false;
+  private String executeAsUserBinaryPath = null;
+  private String effectiveUser = null;
 
   private int stdErrSnippetSize = 30;
   private int stdOutSnippetSize = 30;
@@ -100,7 +103,12 @@ public class AzkabanProcessBuilder {
   }
 
   public AzkabanProcess build() {
-    return new AzkabanProcess(cmd, env, workingDir, logger);
+    if (isExecuteAsUser) {
+      return new AzkabanProcess(cmd, env, workingDir, logger,
+          executeAsUserBinaryPath, effectiveUser);
+    } else {
+      return new AzkabanProcess(cmd, env, workingDir, logger);
+    }
   }
 
   public List<String> getCommand() {
@@ -116,4 +124,19 @@ public class AzkabanProcessBuilder {
     return "ProcessBuilder(cmd = " + Joiner.on(" ").join(cmd) + ", env = "
         + env + ", cwd = " + workingDir + ")";
   }
+
+  public AzkabanProcessBuilder enableExecuteAsUser() {
+    this.isExecuteAsUser = true;
+    return this;
+  }
+
+  public AzkabanProcessBuilder setExecuteAsUserBinaryPath(String executeAsUserBinaryPath) {
+    this.executeAsUserBinaryPath = executeAsUserBinaryPath;
+    return this;
+  }
+
+  public AzkabanProcessBuilder setEffectiveUser(String effectiveUser) {
+    this.effectiveUser = effectiveUser;
+    return this;
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/jobtype/JobTypeManager.java b/azkaban-common/src/main/java/azkaban/jobtype/JobTypeManager.java
index 212f1a9..f1cf351 100644
--- a/azkaban-common/src/main/java/azkaban/jobtype/JobTypeManager.java
+++ b/azkaban-common/src/main/java/azkaban/jobtype/JobTypeManager.java
@@ -340,6 +340,7 @@ public class JobTypeManager {
             jobProps, jobType));
       }
 
+      // TODO: should the logic below mirror the logic for PluginLoadProps?
       Props pluginJobProps = pluginSet.getPluginJobProps(jobType);
       if (pluginJobProps != null) {
         for (String k : pluginJobProps.getKeySet()) {
@@ -354,7 +355,11 @@ public class JobTypeManager {
       if (pluginLoadProps != null) {
         pluginLoadProps = PropsUtils.resolveProps(pluginLoadProps);
       } else {
-        pluginLoadProps = new Props();
+        // pluginSet.getCommonPluginLoadProps() will return null if there is no plugins directory.
+        // hence assigning default Props() if that's the case
+        pluginLoadProps = pluginSet.getCommonPluginLoadProps();
+        if(pluginJobProps == null)
+          pluginJobProps = new Props();
       }
 
       job =

build.gradle 16(+16 -0)

diff --git a/build.gradle b/build.gradle
index 332e717..51e4a15 100644
--- a/build.gradle
+++ b/build.gradle
@@ -92,6 +92,22 @@ project(':azkaban-common') {
     }
   }
 
+  apply plugin: 'c'
+  model {
+    components {
+      main(NativeExecutableSpec) {
+        sources {
+          c {
+            source {
+              srcDir "src/main"
+              include "**/*.c"
+            }
+          }
+        }
+      }
+    }
+  }
+
   dependencies {
     compile('com.google.guava:guava:13.0.1')
     compile('com.h2database:h2:1.3.170')