azkaban-aplcache

Users should be able to use a custom job log format (#1719) Users

4/7/2018 12:36:16 AM
3.46.0

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index eb5d492..3137184 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -109,7 +109,7 @@ public class Constants {
     // Designates one of the external link topics to correspond to a job log viewer
     public static final String AZKABAN_SERVER_EXTERNAL_LOGVIEWER_TOPIC = "azkaban.server.external.logviewer.topic";
     public static final String AZKABAN_SERVER_EXTERNAL_LOGVIEWER_LABEL = "azkaban.server.external.logviewer.label";
-    
+
     /*
      * Hadoop/Spark user job link.
      * Example:
@@ -267,6 +267,11 @@ public class Constants {
     public static final String MAX_XMX_DEFAULT = "2G";
     // The hadoop user the job should run under. If not specified, it will default to submit user.
     public static final String USER_TO_PROXY = "user.to.proxy";
+
+    /**
+     * Format string for Log4j's EnhancedPatternLayout
+     */
+    public static final String JOB_LOG_LAYOUT = "azkaban.job.log.layout";
   }
 
   public static class JobCallbackProperties {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index 35c8b3c..4037282 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -63,8 +63,10 @@ public class JobRunner extends EventHandler implements Runnable {
 
   private static final Logger serverLogger = Logger.getLogger(JobRunner.class);
   private static final Object logCreatorLock = new Object();
-  private final Layout DEFAULT_LAYOUT = new EnhancedPatternLayout(
-      "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
+
+  private static final String DEFAULT_LAYOUT =
+      "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n";
+
   private final Object syncObject = new Object();
   private final JobTypeManager jobtypeManager;
   private final ExecutorLoader loader;
@@ -72,7 +74,7 @@ public class JobRunner extends EventHandler implements Runnable {
   private final Props azkabanProps;
   private final ExecutableNode node;
   private final File workingDir;
-  private final Layout loggerLayout = this.DEFAULT_LAYOUT;
+  private final Layout loggerLayout;
   private final String jobId;
   private final Set<String> pipelineJobs = new HashSet<>();
   private Logger logger = null;
@@ -106,6 +108,10 @@ public class JobRunner extends EventHandler implements Runnable {
     this.loader = loader;
     this.jobtypeManager = jobtypeManager;
     this.azkabanProps = azkabanProps;
+    final String jobLogLayout = props.getString(
+        JobProperties.JOB_LOG_LAYOUT, DEFAULT_LAYOUT);
+
+    this.loggerLayout = new EnhancedPatternLayout(jobLogLayout);
   }
 
   public static String createLogFileName(final ExecutableNode node, final int attempt) {
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index dcaa4ba..6c03c4b 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -34,8 +34,15 @@ import azkaban.jobtype.JobTypePluginSet;
 import azkaban.spi.EventType;
 import azkaban.test.TestUtils;
 import azkaban.utils.Props;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.charset.Charset;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.HashSet;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
@@ -80,7 +87,7 @@ public class JobRunnerTest {
   }
 
   @Test
-  public void testBasicRun() {
+  public void testBasicRun() throws IOException {
     final MockExecutorLoader loader = new MockExecutorLoader();
     final EventCollectorListener eventCollector = new EventCollectorListener();
     final JobRunner runner =
@@ -103,7 +110,11 @@ public class JobRunnerTest {
     final File logFile = new File(runner.getLogFilePath());
     final Props outputProps = runner.getNode().getOutputProps();
     Assert.assertTrue(outputProps != null);
-    Assert.assertTrue(logFile.exists());
+    try (final BufferedReader br = getLogReader(logFile)) {
+      final String firstLine = br.readLine();
+      Assert.assertTrue("Unexpected default layout",
+          firstLine.startsWith(new SimpleDateFormat("dd-MM-yyyy").format(new Date())));
+    }
     // Verify that user.to.proxy is default to submit user.
     Assert.assertEquals(SUBMIT_USER, runner.getProps().get(JobProperties.USER_TO_PROXY));
 
@@ -113,6 +124,11 @@ public class JobRunnerTest {
         .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
   }
 
+  private BufferedReader getLogReader(File logFile) throws FileNotFoundException {
+    return new BufferedReader(new InputStreamReader(new FileInputStream(logFile),
+        Charset.defaultCharset()));
+  }
+
   @Test
   public void testFailedRun() {
     final MockExecutorLoader loader = new MockExecutorLoader();
@@ -324,8 +340,23 @@ public class JobRunnerTest {
     eventCollector.assertEvents(EventType.JOB_FINISHED);
   }
 
-  private Props createProps(final int sleepSec, final boolean fail) {
-    final Props props = new Props();
+  @Test
+  public void testCustomLogLayout() throws IOException {
+    final MockExecutorLoader loader = new MockExecutorLoader();
+    final EventCollectorListener eventCollector = new EventCollectorListener();
+    final Props azkabanProps = new Props();
+    azkabanProps.put(JobProperties.JOB_LOG_LAYOUT, "TEST %c{1} %p - %m\n");
+    final JobRunner runner =
+        createJobRunner(1, "testJob", 0, false, loader, eventCollector, azkabanProps);
+    runner.run();
+    try (final BufferedReader br = getLogReader(runner.getLogFile())) {
+      final String firstLine = br.readLine();
+      Assert.assertTrue("Unexpected default layout",
+          firstLine.startsWith("TEST"));
+    }
+  }
+
+  private Props createProps(final int sleepSec, final boolean fail, Props props) {
     props.put("type", "test");
     props.put("seconds", sleepSec);
     props.put("fail", String.valueOf(fail));
@@ -334,6 +365,11 @@ public class JobRunnerTest {
 
   private JobRunner createJobRunner(final int execId, final String name, final int time,
       final boolean fail, final ExecutorLoader loader, final EventCollectorListener listener) {
+    return createJobRunner(execId, name, time, fail, loader, listener, new Props());
+  }
+
+  private JobRunner createJobRunner(final int execId, final String name, final int time,
+      final boolean fail, final ExecutorLoader loader, final EventCollectorListener listener, Props jobProps) {
     final Props azkabanProps = new Props();
     final ExecutableFlow flow = new ExecutableFlow();
     flow.setExecutionId(execId);
@@ -342,7 +378,7 @@ public class JobRunnerTest {
     node.setId(name);
     node.setParentFlow(flow);
 
-    final Props props = createProps(time, fail);
+    final Props props = createProps(time, fail, jobProps);
     node.setInputProps(props);
     final HashSet<String> proxyUsers = new HashSet<>();
     proxyUsers.add(flow.getSubmitUser());