diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 317cd77..d26bb10 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012 LinkedIn Corp.
+ * Copyright 2017 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
@@ -26,14 +26,15 @@ import azkaban.metrics.CommonMetrics;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.SystemMemoryInfo;
+import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.HashSet;
import java.util.Set;
-import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
@@ -55,6 +56,9 @@ public class ProcessJob extends AbstractProcessJob {
private volatile AzkabanProcess process;
private volatile boolean killed = false;
+ // For testing only. True if the job process exits successfully.
+ private volatile boolean success;
+
public ProcessJob(final String jobId, final Props sysProps,
final Props jobProps, final Logger log) {
super(jobId, sysProps, jobProps, log);
@@ -207,9 +211,10 @@ public class ProcessJob extends AbstractProcessJob {
final boolean isExecuteAsUser = this.sysProps.getBoolean(EXECUTE_AS_USER, true);
//Get list of users we never execute flows as. (ie: root, azkaban)
- final Set<String> blackListedUsers = new HashSet<String>(
+ final Set<String> blackListedUsers = new HashSet<>(
Arrays.asList(
- this.sysProps.getString(Constants.ConfigurationKeys.BLACK_LISTED_USERS,"root,azkaban").split(",")
+ this.sysProps.getString(Constants.ConfigurationKeys.BLACK_LISTED_USERS, "root,azkaban")
+ .split(",")
)
);
@@ -254,13 +259,20 @@ public class ProcessJob extends AbstractProcessJob {
// print out the Job properties to the job log.
this.logJobProperties();
- boolean success = false;
- this.process = builder.build();
+ synchronized (this) {
+ // Make sure that checking if the process job is killed and creating an AzkabanProcess
+ // object are atomic. The cancel method relies on this to make sure that if this.process is
+ // not null, this block of code which includes checking if the job is killed has not been
+ // executed yet.
+ if (this.killed) {
+ info("The job is killed. Abort. No job process created.");
+ return;
+ }
+ this.process = builder.build();
+ }
try {
- if (!this.killed) {
this.process.run();
- success = true;
- }
+ this.success = true;
} catch (final Throwable e) {
for (final File file : propFiles) {
if (file != null && file.exists()) {
@@ -270,7 +282,7 @@ public class ProcessJob extends AbstractProcessJob {
throw new RuntimeException(e);
} finally {
info("Process completed "
- + (success ? "successfully" : "unsuccessfully") + " in "
+ + (this.success ? "successfully" : "unsuccessfully") + " in "
+ ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
}
}
@@ -368,11 +380,14 @@ public class ProcessJob extends AbstractProcessJob {
synchronized (this) {
this.killed = true;
this.notify();
+ if (this.process == null) {
+ // The job thread has not checked if the job is killed yet.
+ // setting the killed flag should be enough to abort the job.
+ // There is no job process to kill.
+ return;
+ }
}
-
- if (this.process == null) {
- throw new IllegalStateException("Not started.");
- }
+ this.process.awaitStartup();
final boolean processkilled = this.process
.softKill(KILL_TIME.toMillis(), TimeUnit.MILLISECONDS);
if (!processkilled) {
@@ -390,6 +405,16 @@ public class ProcessJob extends AbstractProcessJob {
return this.process.getProcessId();
}
+ @VisibleForTesting
+ boolean isSuccess() {
+ return this.success;
+ }
+
+ @VisibleForTesting
+ AzkabanProcess getProcess() {
+ return this.process;
+ }
+
public String getPath() {
return this._jobPath == null ? "" : this._jobPath;
}
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
index 5683d9f..4175d58 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012 LinkedIn Corp.
+ * Copyright 2017 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
@@ -150,6 +150,9 @@ public class AzkabanProcess {
/**
* Await the start of this process
*
+ * When this method returns, the job process has been created and a this.processId has been
+ * set.
+ *
* @throws InterruptedException if the thread is interrupted while waiting.
*/
public void awaitStartup() throws InterruptedException {
diff --git a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
index c75211b..8521a10 100644
--- a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014 LinkedIn Corp.
+ * Copyright 2017 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
@@ -17,7 +17,6 @@
package azkaban.jobExecutor;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
import azkaban.flow.CommonJobProperties;
import azkaban.utils.Props;
@@ -177,8 +176,8 @@ public class ProcessJobTest {
}
/**
- * test cancellation of the job before associated process is constructed
- * expect job will be cancelled successfully
+ * test cancellation of the job before associated process is constructed expect job will be
+ * cancelled successfully
*/
@Test
public void testCancelDuringPreparation() throws InterruptedException, ExecutionException {
@@ -198,12 +197,36 @@ public class ProcessJobTest {
final ExecutorService executorService = Executors.newSingleThreadExecutor();
final Future future = executorService.submit(sleepBeforeRunJob);
- Thread.sleep(1000);
- assertThatThrownBy(() -> sleepBeforeRunJob.cancel()).hasMessage("Not started.");
+ sleepBeforeRunJob.cancel();
future.get();
assertThat(sleepBeforeRunJob.getProgress()).isEqualTo(0.0);
}
+ @Test
+ public void testCancelAfterJobProcessCreation() throws InterruptedException, ExecutionException {
+ this.props.put(ProcessJob.COMMAND, "sleep 1");
+
+ final ExecutorService executorService = Executors.newSingleThreadExecutor();
+ final Future future = executorService.submit(() -> {
+ try {
+ this.job.run();
+ } catch (final Exception e) {
+ e.printStackTrace();
+ }
+ });
+
+ // Wait for the AzkabanProcess object to be created before calling job.cancel so that the job
+ // process is created.
+ while (this.job.getProcess() == null) {
+ Thread.sleep(1);
+ }
+
+ this.job.cancel();
+ // Wait for the job to finish before testing its state.
+ future.get();
+ assertThat(this.job.isSuccess()).isFalse();
+ }
+
static class SleepBeforeRunJob extends ProcessJob implements Runnable {
public SleepBeforeRunJob(final String jobId, final Props sysProps, final Props jobProps,
@@ -214,13 +237,12 @@ public class ProcessJobTest {
@Override
public void run() {
try {
- info("sleep for 3 seconds before actually running the job");
- Thread.sleep(3 * 1000);
+ info("sleep for some time before actually running the job");
+ Thread.sleep(10);
super.run();
} catch (final Exception ex) {
this.getLog().error(ex);
}
}
}
-
}