azkaban-aplcache

add unit test for #1253 (#1260)

6/29/2017 9:20:20 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 9d49543..c45d25d 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -259,7 +259,6 @@ public class ProcessJob extends AbstractProcessJob {
         }
         throw new RuntimeException(e);
       } finally {
-        this.process = null;
         info("Process completed "
             + (success ? "successfully" : "unsuccessfully") + " in "
             + ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
diff --git a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
index e6eb5e8..07f13c0 100644
--- a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
@@ -16,10 +16,17 @@
 
 package azkaban.jobExecutor;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
 import azkaban.flow.CommonJobProperties;
 import azkaban.utils.Props;
 import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import org.apache.log4j.Logger;
 import org.junit.After;
 import org.junit.Assert;
@@ -136,4 +143,52 @@ public class ProcessJobTest {
     Assert.assertArrayEquals(new String[]{"a", "e=b c"},
         ProcessJob.partitionCommandLine(test3));
   }
+
+  /**
+   * test cancellation of the job before associated process is constructed
+   * expect job will be cancelled successfully
+   */
+  @Test
+  public void testCancelDuringPreparation() throws InterruptedException, ExecutionException {
+    final Props jobProps = new Props();
+    jobProps.put("command", "echo hello");
+    jobProps.put("working.dir", "/tmp");
+    jobProps.put("user.to.proxy", "test");
+    jobProps.put("azkaban.flow.projectname", "test");
+    jobProps.put("azkaban.flow.flowid", "test");
+    jobProps.put("azkaban.job.id", "test");
+    jobProps.put("azkaban.flow.execid", "1");
+
+    final Props sysProps = new Props();
+    sysProps.put("execute.as.user", "false");
+    final SleepBeforeRunJob sleepBeforeRunJob = new SleepBeforeRunJob("test", sysProps, jobProps,
+        this.log);
+
+    final ExecutorService executorService = Executors.newSingleThreadExecutor();
+    final Future future = executorService.submit(sleepBeforeRunJob);
+    Thread.sleep(1000);
+    assertThatThrownBy(() -> sleepBeforeRunJob.cancel()).hasMessage("Not started.");
+    future.get();
+    assertThat(sleepBeforeRunJob.getProgress()).isEqualTo(0.0);
+  }
+
+  class SleepBeforeRunJob extends ProcessJob implements Runnable {
+
+    public SleepBeforeRunJob(final String jobId, final Props sysProps, final Props jobProps,
+        final Logger log) {
+      super(jobId, sysProps, jobProps, log);
+    }
+
+    @Override
+    public void run() {
+      try {
+        info("sleep for 3 seconds before actually running the job");
+        Thread.sleep(3 * 1000);
+        super.run();
+      } catch (final Exception ex) {
+        this.getLog().error(ex);
+      }
+    }
+  }
+
 }