Details
diff --git a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopHiveJob.java b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopHiveJob.java
index a729c84..279823a 100644
--- a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopHiveJob.java
+++ b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopHiveJob.java
@@ -291,11 +291,8 @@ public class HadoopHiveJob extends JavaProcessJob {
info("Cancel called. Killing the Hive launched MR jobs on the cluster");
- String azExecId = jobProps.getString(CommonJobProperties.EXEC_ID);
- final String logFilePath =
- String.format("%s/_job.%s.%s.log", getWorkingDirectory(), azExecId,
- getId());
- info("log file path is: " + logFilePath);
+ final String logFilePath = jobProps.getString(CommonJobProperties.JOB_LOG_FILE);
+ info("Log file path is: " + logFilePath);
HadoopJobUtils.proxyUserKillAllSpawnedHadoopJobs(logFilePath, jobProps, tokenFile, getLog());
}
diff --git a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJavaJob.java b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJavaJob.java
index ed902ba..ddc3f23 100644
--- a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJavaJob.java
+++ b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJavaJob.java
@@ -264,11 +264,8 @@ public class HadoopJavaJob extends JavaProcessJob {
info("Cancel called. Killing the launched MR jobs on the cluster");
- String azExecId = jobProps.getString(CommonJobProperties.EXEC_ID);
- final String logFilePath =
- String.format("%s/_job.%s.%s.log", getWorkingDirectory(), azExecId,
- getId());
- info("log file path is: " + logFilePath);
+ final String logFilePath = jobProps.getString(CommonJobProperties.JOB_LOG_FILE);
+ info("Log file path is: " + logFilePath);
HadoopJobUtils.proxyUserKillAllSpawnedHadoopJobs(logFilePath, jobProps,
tokenFile, getLog());
diff --git a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopPigJob.java b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopPigJob.java
index 2800154..10d4053 100644
--- a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopPigJob.java
+++ b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopPigJob.java
@@ -396,11 +396,8 @@ public class HadoopPigJob extends JavaProcessJob {
info("Cancel called. Killing the Pig launched MR jobs on the cluster");
- String azExecId = jobProps.getString(CommonJobProperties.EXEC_ID);
- final String logFilePath =
- String.format("%s/_job.%s.%s.log", getWorkingDirectory(), azExecId,
- getId());
- info("log file path is: " + logFilePath);
+ final String logFilePath = jobProps.getString(CommonJobProperties.JOB_LOG_FILE);
+ info("Log file path is: " + logFilePath);
HadoopJobUtils.proxyUserKillAllSpawnedHadoopJobs(logFilePath, jobProps,
tokenFile, getLog());
diff --git a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopShell.java b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopShell.java
index b9764e9..e37b918 100644
--- a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopShell.java
+++ b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopShell.java
@@ -137,9 +137,8 @@ public class HadoopShell extends ProcessJob {
info("Cancel called. Killing the launched Hadoop jobs on the cluster");
- String azExecId = jobProps.getString(CommonJobProperties.EXEC_ID);
- final String logFilePath = String.format("%s/_job.%s.%s.log", getWorkingDirectory(), azExecId, getId());
- info("log file path is: " + logFilePath);
+ final String logFilePath = jobProps.getString(CommonJobProperties.JOB_LOG_FILE);
+ info("Log file path is: " + logFilePath);
HadoopJobUtils.proxyUserKillAllSpawnedHadoopJobs(logFilePath, jobProps, tokenFile, getLog());
}
diff --git a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopSparkJob.java b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopSparkJob.java
index 2373b0c..db53356 100644
--- a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopSparkJob.java
+++ b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopSparkJob.java
@@ -681,11 +681,8 @@ public class HadoopSparkJob extends JavaProcessJob {
info("Cancel called. Killing the Spark job on the cluster");
- final String azExecId = this.jobProps.getString(CommonJobProperties.EXEC_ID);
- final String logFilePath =
- String.format("%s/_job.%s.%s.log", getWorkingDirectory(), azExecId,
- getId());
- info("log file path is: " + logFilePath);
+ final String logFilePath = jobProps.getString(CommonJobProperties.JOB_LOG_FILE);
+ info("Log file path is: " + logFilePath);
HadoopJobUtils.proxyUserKillAllSpawnedHadoopJobs(logFilePath, this.jobProps,
this.tokenFile, getLog());
diff --git a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
index 24ed874..0c57d7e 100644
--- a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
+++ b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
@@ -75,17 +75,22 @@ public class CommonJobProperties {
public static final String JOB_ATTEMPT = "azkaban.job.attempt";
/**
- * The attempt number of the executing job.
+ * The job's metadata file name.
*/
public static final String JOB_METADATA_FILE = "azkaban.job.metadata.file";
/**
- * The attempt number of the executing job.
+ * The job's attachment file absolute path.
*/
public static final String JOB_ATTACHMENT_FILE =
"azkaban.job.attachment.file";
/**
+ * The job's log file absolute path.
+ */
+ public static final String JOB_LOG_FILE = "azkaban.job.log.file";
+
+ /**
* The executing flow id
*/
public static final String FLOW_ID = "azkaban.flow.flowid";
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index 4037282..5a6a7bc 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -49,6 +49,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
+
import org.apache.kafka.log4jappender.KafkaLog4jAppender;
import org.apache.log4j.Appender;
import org.apache.log4j.EnhancedPatternLayout;
@@ -321,6 +322,7 @@ public class JobRunner extends EventHandler implements Runnable {
final String logName = createLogFileName(this.node);
this.logFile = new File(this.workingDir, logName);
final String absolutePath = this.logFile.getAbsolutePath();
+ this.flowLogger.info("Log file path for job: " + this.jobId + " is: " + absolutePath);
// Attempt to create FileAppender
final RollingFileAppender fileAppender =
@@ -671,6 +673,7 @@ public class JobRunner extends EventHandler implements Runnable {
this.props.put(CommonJobProperties.JOB_METADATA_FILE,
createMetaDataFileName(this.node));
this.props.put(CommonJobProperties.JOB_ATTACHMENT_FILE, this.attachmentFileName);
+ this.props.put(CommonJobProperties.JOB_LOG_FILE, this.logFile.getAbsolutePath());
finalStatus = changeStatus(Status.RUNNING);
// Ability to specify working directory
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index 6c03c4b..28c525e 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -29,17 +29,19 @@ import azkaban.executor.ExecutorLoader;
import azkaban.executor.InteractiveTestJob;
import azkaban.executor.MockExecutorLoader;
import azkaban.executor.Status;
+import azkaban.flow.CommonJobProperties;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypePluginSet;
import azkaban.spi.EventType;
import azkaban.test.TestUtils;
import azkaban.utils.Props;
import java.io.BufferedReader;
-import java.io.InputStreamReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
+import java.io.InputStreamReader;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;
@@ -96,7 +98,7 @@ public class JobRunnerTest {
eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED
- || runner.getStatus() != Status.FAILED);
+ && runner.getStatus() != Status.FAILED);
runner.run();
eventCollector.handleEvent(Event.create(null, EventType.JOB_FINISHED, new EventData(node)));
@@ -110,6 +112,9 @@ public class JobRunnerTest {
final File logFile = new File(runner.getLogFilePath());
final Props outputProps = runner.getNode().getOutputProps();
Assert.assertTrue(outputProps != null);
+
+ checkRequiredJobProperties(runner, logFile);
+
try (final BufferedReader br = getLogReader(logFile)) {
final String firstLine = br.readLine();
Assert.assertTrue("Unexpected default layout",
@@ -124,6 +129,26 @@ public class JobRunnerTest {
.assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
}
+ private void checkRequiredJobProperties(JobRunner runner, File logFile) {
+ Field jobField = null;
+ try {
+ jobField = runner.getClass().getDeclaredField("job");
+ } catch (NoSuchFieldException e) {
+ Assert.fail("'job' field not found");
+ }
+ jobField.setAccessible(true);
+ InteractiveTestJob job = null;
+ try {
+ job = (InteractiveTestJob) jobField.get(runner);
+ } catch (IllegalAccessException e) {
+ Assert.fail("'job' field not accessible");
+ }
+ Props jobProps = job.getJobProps();
+ Assert.assertEquals("Unexpected log file path in properties",
+ logFile.getAbsolutePath(),
+ jobProps.get(CommonJobProperties.JOB_LOG_FILE));
+ }
+
private BufferedReader getLogReader(File logFile) throws FileNotFoundException {
return new BufferedReader(new InputStreamReader(new FileInputStream(logFile),
Charset.defaultCharset()));