azkaban-aplcache

Fix mismatch in name of job log files (#2042) * Fix comments

11/29/2018 8:17:40 PM

Details

diff --git a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopHiveJob.java b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopHiveJob.java
index a729c84..279823a 100644
--- a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopHiveJob.java
+++ b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopHiveJob.java
@@ -291,11 +291,8 @@ public class HadoopHiveJob extends JavaProcessJob {
 
     info("Cancel called.  Killing the Hive launched MR jobs on the cluster");
 
-    String azExecId = jobProps.getString(CommonJobProperties.EXEC_ID);
-    final String logFilePath =
-        String.format("%s/_job.%s.%s.log", getWorkingDirectory(), azExecId,
-            getId());
-    info("log file path is: " + logFilePath);
+    final String logFilePath = jobProps.getString(CommonJobProperties.JOB_LOG_FILE);
+    info("Log file path is: " + logFilePath);
 
     HadoopJobUtils.proxyUserKillAllSpawnedHadoopJobs(logFilePath, jobProps, tokenFile, getLog());
   }
diff --git a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJavaJob.java b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJavaJob.java
index ed902ba..ddc3f23 100644
--- a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJavaJob.java
+++ b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJavaJob.java
@@ -264,11 +264,8 @@ public class HadoopJavaJob extends JavaProcessJob {
 
     info("Cancel called.  Killing the launched MR jobs on the cluster");
 
-    String azExecId = jobProps.getString(CommonJobProperties.EXEC_ID);
-    final String logFilePath =
-        String.format("%s/_job.%s.%s.log", getWorkingDirectory(), azExecId,
-            getId());
-    info("log file path is: " + logFilePath);
+    final String logFilePath = jobProps.getString(CommonJobProperties.JOB_LOG_FILE);
+    info("Log file path is: " + logFilePath);
 
     HadoopJobUtils.proxyUserKillAllSpawnedHadoopJobs(logFilePath, jobProps,
         tokenFile, getLog());
diff --git a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopPigJob.java b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopPigJob.java
index 2800154..10d4053 100644
--- a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopPigJob.java
+++ b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopPigJob.java
@@ -396,11 +396,8 @@ public class HadoopPigJob extends JavaProcessJob {
 
     info("Cancel called.  Killing the Pig launched MR jobs on the cluster");
 
-    String azExecId = jobProps.getString(CommonJobProperties.EXEC_ID);
-    final String logFilePath =
-        String.format("%s/_job.%s.%s.log", getWorkingDirectory(), azExecId,
-            getId());
-    info("log file path is: " + logFilePath);
+    final String logFilePath = jobProps.getString(CommonJobProperties.JOB_LOG_FILE);
+    info("Log file path is: " + logFilePath);
 
     HadoopJobUtils.proxyUserKillAllSpawnedHadoopJobs(logFilePath, jobProps,
         tokenFile, getLog());
diff --git a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopShell.java b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopShell.java
index b9764e9..e37b918 100644
--- a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopShell.java
+++ b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopShell.java
@@ -137,9 +137,8 @@ public class HadoopShell extends ProcessJob {
 
 		info("Cancel called.  Killing the launched Hadoop jobs on the cluster");
 
-		String azExecId = jobProps.getString(CommonJobProperties.EXEC_ID);
-		final String logFilePath = String.format("%s/_job.%s.%s.log", getWorkingDirectory(), azExecId, getId());
-		info("log file path is: " + logFilePath);
+		final String logFilePath = jobProps.getString(CommonJobProperties.JOB_LOG_FILE);
+		info("Log file path is: " + logFilePath);
 
 		HadoopJobUtils.proxyUserKillAllSpawnedHadoopJobs(logFilePath, jobProps, tokenFile, getLog());
 	}
diff --git a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopSparkJob.java b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopSparkJob.java
index 2373b0c..db53356 100644
--- a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopSparkJob.java
+++ b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopSparkJob.java
@@ -681,11 +681,8 @@ public class HadoopSparkJob extends JavaProcessJob {
 
     info("Cancel called.  Killing the Spark job on the cluster");
 
-    final String azExecId = this.jobProps.getString(CommonJobProperties.EXEC_ID);
-    final String logFilePath =
-        String.format("%s/_job.%s.%s.log", getWorkingDirectory(), azExecId,
-            getId());
-    info("log file path is: " + logFilePath);
+    final String logFilePath = jobProps.getString(CommonJobProperties.JOB_LOG_FILE);
+    info("Log file path is: " + logFilePath);
 
     HadoopJobUtils.proxyUserKillAllSpawnedHadoopJobs(logFilePath, this.jobProps,
         this.tokenFile, getLog());
diff --git a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
index 24ed874..0c57d7e 100644
--- a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
+++ b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
@@ -75,17 +75,22 @@ public class CommonJobProperties {
   public static final String JOB_ATTEMPT = "azkaban.job.attempt";
 
   /**
-   * The attempt number of the executing job.
+   * The job's metadata file name.
    */
   public static final String JOB_METADATA_FILE = "azkaban.job.metadata.file";
 
   /**
-   * The attempt number of the executing job.
+   * The job's attachment file absolute path.
    */
   public static final String JOB_ATTACHMENT_FILE =
       "azkaban.job.attachment.file";
 
   /**
+   * The job's log file absolute path.
+   */
+  public static final String JOB_LOG_FILE = "azkaban.job.log.file";
+  
+  /**
    * The executing flow id
    */
   public static final String FLOW_ID = "azkaban.flow.flowid";
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index 4037282..5a6a7bc 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -49,6 +49,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
+
 import org.apache.kafka.log4jappender.KafkaLog4jAppender;
 import org.apache.log4j.Appender;
 import org.apache.log4j.EnhancedPatternLayout;
@@ -321,6 +322,7 @@ public class JobRunner extends EventHandler implements Runnable {
     final String logName = createLogFileName(this.node);
     this.logFile = new File(this.workingDir, logName);
     final String absolutePath = this.logFile.getAbsolutePath();
+    this.flowLogger.info("Log file path for job: " + this.jobId + " is: " + absolutePath);
 
     // Attempt to create FileAppender
     final RollingFileAppender fileAppender =
@@ -671,6 +673,7 @@ public class JobRunner extends EventHandler implements Runnable {
       this.props.put(CommonJobProperties.JOB_METADATA_FILE,
           createMetaDataFileName(this.node));
       this.props.put(CommonJobProperties.JOB_ATTACHMENT_FILE, this.attachmentFileName);
+      this.props.put(CommonJobProperties.JOB_LOG_FILE, this.logFile.getAbsolutePath());
       finalStatus = changeStatus(Status.RUNNING);
 
       // Ability to specify working directory
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index 6c03c4b..28c525e 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -29,17 +29,19 @@ import azkaban.executor.ExecutorLoader;
 import azkaban.executor.InteractiveTestJob;
 import azkaban.executor.MockExecutorLoader;
 import azkaban.executor.Status;
+import azkaban.flow.CommonJobProperties;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypePluginSet;
 import azkaban.spi.EventType;
 import azkaban.test.TestUtils;
 import azkaban.utils.Props;
 import java.io.BufferedReader;
-import java.io.InputStreamReader;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
+import java.io.InputStreamReader;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.nio.charset.Charset;
 import java.text.SimpleDateFormat;
 import java.util.Date;
@@ -96,7 +98,7 @@ public class JobRunnerTest {
 
     eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
     Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED
-        || runner.getStatus() != Status.FAILED);
+        && runner.getStatus() != Status.FAILED);
 
     runner.run();
     eventCollector.handleEvent(Event.create(null, EventType.JOB_FINISHED, new EventData(node)));
@@ -110,6 +112,9 @@ public class JobRunnerTest {
     final File logFile = new File(runner.getLogFilePath());
     final Props outputProps = runner.getNode().getOutputProps();
     Assert.assertTrue(outputProps != null);
+
+    checkRequiredJobProperties(runner, logFile);
+
     try (final BufferedReader br = getLogReader(logFile)) {
       final String firstLine = br.readLine();
       Assert.assertTrue("Unexpected default layout",
@@ -124,6 +129,26 @@ public class JobRunnerTest {
         .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
   }
 
+  private void checkRequiredJobProperties(JobRunner runner, File logFile) {
+    Field jobField = null;
+    try {
+      jobField = runner.getClass().getDeclaredField("job");
+    } catch (NoSuchFieldException e) {
+      Assert.fail("'job' field not found");
+    }
+    jobField.setAccessible(true);
+    InteractiveTestJob job = null;
+    try {
+      job = (InteractiveTestJob) jobField.get(runner);
+    } catch (IllegalAccessException e) {
+      Assert.fail("'job' field not accessible");
+    }
+    Props jobProps = job.getJobProps();
+    Assert.assertEquals("Unexpected log file path in properties",
+        logFile.getAbsolutePath(),
+        jobProps.get(CommonJobProperties.JOB_LOG_FILE));
+  }
+
   private BufferedReader getLogReader(File logFile) throws FileNotFoundException {
     return new BufferedReader(new InputStreamReader(new FileInputStream(logFile),
         Charset.defaultCharset()));