azkaban-aplcache

Fix for failing to kill a job due to a race condition (#1310) *

8/8/2017 7:37:00 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 317cd77..d26bb10 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2012 LinkedIn Corp.
+ * Copyright 2017 LinkedIn Corp.
  *
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  * use this file except in compliance with the License. You may obtain a copy of
@@ -26,14 +26,15 @@ import azkaban.metrics.CommonMetrics;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.utils.SystemMemoryInfo;
+import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.HashSet;
 import java.util.Set;
-import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 import org.apache.log4j.Logger;
 
@@ -55,6 +56,9 @@ public class ProcessJob extends AbstractProcessJob {
   private volatile AzkabanProcess process;
   private volatile boolean killed = false;
 
+  // For testing only. True if the job process exits successfully.
+  private volatile boolean success;
+
   public ProcessJob(final String jobId, final Props sysProps,
       final Props jobProps, final Logger log) {
     super(jobId, sysProps, jobProps, log);
@@ -207,9 +211,10 @@ public class ProcessJob extends AbstractProcessJob {
     final boolean isExecuteAsUser = this.sysProps.getBoolean(EXECUTE_AS_USER, true);
 
     //Get list of users we never execute flows as. (ie: root, azkaban)
-    final Set<String> blackListedUsers = new HashSet<String>(
+    final Set<String> blackListedUsers = new HashSet<>(
         Arrays.asList(
-            this.sysProps.getString(Constants.ConfigurationKeys.BLACK_LISTED_USERS,"root,azkaban").split(",")
+            this.sysProps.getString(Constants.ConfigurationKeys.BLACK_LISTED_USERS, "root,azkaban")
+                .split(",")
         )
     );
 
@@ -254,13 +259,20 @@ public class ProcessJob extends AbstractProcessJob {
       // print out the Job properties to the job log.
       this.logJobProperties();
 
-      boolean success = false;
-      this.process = builder.build();
+      synchronized (this) {
+        // Make sure that checking if the process job is killed and creating an AzkabanProcess
+        // object are atomic. The cancel method relies on this to make sure that if this.process is
+        // not null, this block of code which includes checking if the job is killed has not been
+        // executed yet.
+        if (this.killed) {
+          info("The job is killed. Abort. No job process created.");
+          return;
+        }
+        this.process = builder.build();
+      }
       try {
-        if (!this.killed) {
           this.process.run();
-          success = true;
-        }
+          this.success = true;
       } catch (final Throwable e) {
         for (final File file : propFiles) {
           if (file != null && file.exists()) {
@@ -270,7 +282,7 @@ public class ProcessJob extends AbstractProcessJob {
         throw new RuntimeException(e);
       } finally {
         info("Process completed "
-            + (success ? "successfully" : "unsuccessfully") + " in "
+            + (this.success ? "successfully" : "unsuccessfully") + " in "
             + ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
       }
     }
@@ -368,11 +380,14 @@ public class ProcessJob extends AbstractProcessJob {
     synchronized (this) {
       this.killed = true;
       this.notify();
+      if (this.process == null) {
+        // The job thread has not checked if the job is killed yet.
+        // setting the killed flag should be enough to abort the job.
+        // There is no job process to kill.
+        return;
+      }
     }
-
-    if (this.process == null) {
-      throw new IllegalStateException("Not started.");
-    }
+    this.process.awaitStartup();
     final boolean processkilled = this.process
         .softKill(KILL_TIME.toMillis(), TimeUnit.MILLISECONDS);
     if (!processkilled) {
@@ -390,6 +405,16 @@ public class ProcessJob extends AbstractProcessJob {
     return this.process.getProcessId();
   }
 
+  @VisibleForTesting
+  boolean isSuccess() {
+    return this.success;
+  }
+
+  @VisibleForTesting
+  AzkabanProcess getProcess() {
+    return this.process;
+  }
+
   public String getPath() {
     return this._jobPath == null ? "" : this._jobPath;
   }
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
index 5683d9f..4175d58 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2012 LinkedIn Corp.
+ * Copyright 2017 LinkedIn Corp.
  *
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  * use this file except in compliance with the License. You may obtain a copy of
@@ -150,6 +150,9 @@ public class AzkabanProcess {
   /**
    * Await the start of this process
    *
+   * When this method returns, the job process has been created and a this.processId has been
+   * set.
+   *
    * @throws InterruptedException if the thread is interrupted while waiting.
    */
   public void awaitStartup() throws InterruptedException {
diff --git a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
index c75211b..8521a10 100644
--- a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014 LinkedIn Corp.
+ * Copyright 2017 LinkedIn Corp.
  *
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  * use this file except in compliance with the License. You may obtain a copy of
@@ -17,7 +17,6 @@
 package azkaban.jobExecutor;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import azkaban.flow.CommonJobProperties;
 import azkaban.utils.Props;
@@ -177,8 +176,8 @@ public class ProcessJobTest {
   }
 
   /**
-   * test cancellation of the job before associated process is constructed
-   * expect job will be cancelled successfully
+   * test cancellation of the job before associated process is constructed expect job will be
+   * cancelled successfully
    */
   @Test
   public void testCancelDuringPreparation() throws InterruptedException, ExecutionException {
@@ -198,12 +197,36 @@ public class ProcessJobTest {
 
     final ExecutorService executorService = Executors.newSingleThreadExecutor();
     final Future future = executorService.submit(sleepBeforeRunJob);
-    Thread.sleep(1000);
-    assertThatThrownBy(() -> sleepBeforeRunJob.cancel()).hasMessage("Not started.");
+    sleepBeforeRunJob.cancel();
     future.get();
     assertThat(sleepBeforeRunJob.getProgress()).isEqualTo(0.0);
   }
 
+  @Test
+  public void testCancelAfterJobProcessCreation() throws InterruptedException, ExecutionException {
+    this.props.put(ProcessJob.COMMAND, "sleep 1");
+
+    final ExecutorService executorService = Executors.newSingleThreadExecutor();
+    final Future future = executorService.submit(() -> {
+      try {
+        this.job.run();
+      } catch (final Exception e) {
+        e.printStackTrace();
+      }
+    });
+
+    // Wait for the AzkabanProcess object to be created before calling job.cancel so that the job
+    // process is created.
+    while (this.job.getProcess() == null) {
+      Thread.sleep(1);
+    }
+
+    this.job.cancel();
+    // Wait for the job to finish before testing its state.
+    future.get();
+    assertThat(this.job.isSuccess()).isFalse();
+  }
+
   static class SleepBeforeRunJob extends ProcessJob implements Runnable {
 
     public SleepBeforeRunJob(final String jobId, final Props sysProps, final Props jobProps,
@@ -214,13 +237,12 @@ public class ProcessJobTest {
     @Override
     public void run() {
       try {
-        info("sleep for 3 seconds before actually running the job");
-        Thread.sleep(3 * 1000);
+        info("sleep for some time before actually running the job");
+        Thread.sleep(10);
         super.run();
       } catch (final Exception ex) {
         this.getLog().error(ex);
       }
     }
   }
-
 }