diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 9d49543..c45d25d 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -259,7 +259,6 @@ public class ProcessJob extends AbstractProcessJob {
}
throw new RuntimeException(e);
} finally {
- this.process = null;
info("Process completed "
+ (success ? "successfully" : "unsuccessfully") + " in "
+ ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
diff --git a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
index e6eb5e8..07f13c0 100644
--- a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
@@ -16,10 +16,17 @@
package azkaban.jobExecutor;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
import azkaban.flow.CommonJobProperties;
import azkaban.utils.Props;
import java.io.File;
import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
@@ -136,4 +143,52 @@ public class ProcessJobTest {
Assert.assertArrayEquals(new String[]{"a", "e=b c"},
ProcessJob.partitionCommandLine(test3));
}
+
+ /**
+ * test cancellation of the job before associated process is constructed
+ * expect job will be cancelled successfully
+ */
+ @Test
+ public void testCancelDuringPreparation() throws InterruptedException, ExecutionException {
+ final Props jobProps = new Props();
+ jobProps.put("command", "echo hello");
+ jobProps.put("working.dir", "/tmp");
+ jobProps.put("user.to.proxy", "test");
+ jobProps.put("azkaban.flow.projectname", "test");
+ jobProps.put("azkaban.flow.flowid", "test");
+ jobProps.put("azkaban.job.id", "test");
+ jobProps.put("azkaban.flow.execid", "1");
+
+ final Props sysProps = new Props();
+ sysProps.put("execute.as.user", "false");
+ final SleepBeforeRunJob sleepBeforeRunJob = new SleepBeforeRunJob("test", sysProps, jobProps,
+ this.log);
+
+ final ExecutorService executorService = Executors.newSingleThreadExecutor();
+ final Future future = executorService.submit(sleepBeforeRunJob);
+ Thread.sleep(1000);
+ assertThatThrownBy(() -> sleepBeforeRunJob.cancel()).hasMessage("Not started.");
+ future.get();
+ assertThat(sleepBeforeRunJob.getProgress()).isEqualTo(0.0);
+ }
+
+ class SleepBeforeRunJob extends ProcessJob implements Runnable {
+
+ public SleepBeforeRunJob(final String jobId, final Props sysProps, final Props jobProps,
+ final Logger log) {
+ super(jobId, sysProps, jobProps, log);
+ }
+
+ @Override
+ public void run() {
+ try {
+ info("sleep for 3 seconds before actually running the job");
+ Thread.sleep(3 * 1000);
+ super.run();
+ } catch (final Exception ex) {
+ this.getLog().error(ex);
+ }
+ }
+ }
+
}