diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index cf10856..5afe1fc 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -162,6 +162,7 @@ public class FlowRunnerManager implements EventListener,
this.executionDirectory = new File(props.getString("azkaban.execution.dir", "executions"));
if (!this.executionDirectory.exists()) {
this.executionDirectory.mkdirs();
+ setgidPermissionOnExecutionDirectory();
}
this.projectDirectory = new File(props.getString("azkaban.project.dir", "projects"));
if (!this.projectDirectory.exists()) {
@@ -204,6 +205,23 @@ public class FlowRunnerManager implements EventListener,
getClass().getClassLoader());
}
+ /**
+ * Setting the gid bit on the execution directory forces all files/directories created within
+ * the directory to be a part of the group associated with the azkaban process. Then, when users
+ * create their own files, the azkaban cleanup thread can properly remove them.
+ *
+ * Java does not provide a standard library api for setting the gid bit because the gid bit
+ * is system dependent, so the only way to set this bit is to start a new process and run
+ * the shell command "chmod g+s " + execution directory name.
+ *
+ * Note that this should work on most Linux distributions and MacOS, but will not work on Windows.
+ */
+ private void setgidPermissionOnExecutionDirectory() throws IOException {
+ logger.info("Creating subprocess to run shell command: chmod g+s "
+ + this.executionDirectory.toString());
+ Runtime.getRuntime().exec("chmod g+s " + this.executionDirectory.toString());
+ }
+
private TrackingThreadPool createExecutorService(final int nThreads) {
final boolean useNewThreadPool =
this.azkabanProps.getBoolean(EXECUTOR_USE_BOUNDED_THREADPOOL_QUEUE, false);
@@ -391,16 +409,16 @@ public class FlowRunnerManager implements EventListener,
}
- public void cancelJobBySLA(int execId, String jobId)
+ public void cancelJobBySLA(final int execId, final String jobId)
throws ExecutorManagerException {
- FlowRunner flowRunner = runningFlows.get(execId);
+ final FlowRunner flowRunner = this.runningFlows.get(execId);
if (flowRunner == null) {
throw new ExecutorManagerException("Execution " + execId
+ " is not running.");
}
- for (JobRunner jobRunner : flowRunner.getActiveJobRunners()) {
+ for (final JobRunner jobRunner : flowRunner.getActiveJobRunners()) {
if (jobRunner.getJobId().equals(jobId)) {
logger.info("Killing job " + jobId + " in execution " + execId + " by SLA");
jobRunner.killBySLA();