azkaban-aplcache

Running chmod g+s on executions directory to enable proper

7/28/2017 1:38:48 PM

Details

diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index cf10856..5afe1fc 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -162,6 +162,7 @@ public class FlowRunnerManager implements EventListener,
     this.executionDirectory = new File(props.getString("azkaban.execution.dir", "executions"));
     if (!this.executionDirectory.exists()) {
       this.executionDirectory.mkdirs();
+      setgidPermissionOnExecutionDirectory();
     }
     this.projectDirectory = new File(props.getString("azkaban.project.dir", "projects"));
     if (!this.projectDirectory.exists()) {
@@ -204,6 +205,23 @@ public class FlowRunnerManager implements EventListener,
             getClass().getClassLoader());
   }
 
+  /**
+   * Setting the gid bit on the execution directory forces all files/directories created within
+   * the directory to be a part of the group associated with the azkaban process. Then, when users
+   * create their own files, the azkaban cleanup thread can properly remove them.
+   *
+   * Java does not provide a standard library api for setting the gid bit because the gid bit
+   * is system dependent, so the only way to set this bit is to start a new process and run
+   * the shell command "chmod g+s " + execution directory name.
+   *
+   * Note that this should work on most Linux distributions and MacOS, but will not work on Windows.
+   */
+  private void setgidPermissionOnExecutionDirectory() throws IOException {
+    logger.info("Creating subprocess to run shell command: chmod g+s "
+        + this.executionDirectory.toString());
+    Runtime.getRuntime().exec("chmod g+s " + this.executionDirectory.toString());
+  }
+
   private TrackingThreadPool createExecutorService(final int nThreads) {
     final boolean useNewThreadPool =
         this.azkabanProps.getBoolean(EXECUTOR_USE_BOUNDED_THREADPOOL_QUEUE, false);
@@ -391,16 +409,16 @@ public class FlowRunnerManager implements EventListener,
   }
 
 
-  public void cancelJobBySLA(int execId, String jobId)
+  public void cancelJobBySLA(final int execId, final String jobId)
       throws ExecutorManagerException {
-    FlowRunner flowRunner = runningFlows.get(execId);
+    final FlowRunner flowRunner = this.runningFlows.get(execId);
 
     if (flowRunner == null) {
       throw new ExecutorManagerException("Execution " + execId
           + " is not running.");
     }
 
-    for (JobRunner jobRunner : flowRunner.getActiveJobRunners()) {
+    for (final JobRunner jobRunner : flowRunner.getActiveJobRunners()) {
       if (jobRunner.getJobId().equals(jobId)) {
         logger.info("Killing job " + jobId + " in execution " + execId + " by SLA");
         jobRunner.killBySLA();