diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index eb5d492..3137184 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -109,7 +109,7 @@ public class Constants {
// Designates one of the external link topics to correspond to a job log viewer
public static final String AZKABAN_SERVER_EXTERNAL_LOGVIEWER_TOPIC = "azkaban.server.external.logviewer.topic";
public static final String AZKABAN_SERVER_EXTERNAL_LOGVIEWER_LABEL = "azkaban.server.external.logviewer.label";
-
+
/*
* Hadoop/Spark user job link.
* Example:
@@ -267,6 +267,11 @@ public class Constants {
public static final String MAX_XMX_DEFAULT = "2G";
// The hadoop user the job should run under. If not specified, it will default to submit user.
public static final String USER_TO_PROXY = "user.to.proxy";
+
+ /**
+ * Format string for Log4j's EnhancedPatternLayout
+ */
+ public static final String JOB_LOG_LAYOUT = "azkaban.job.log.layout";
}
public static class JobCallbackProperties {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index 35c8b3c..4037282 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -63,8 +63,10 @@ public class JobRunner extends EventHandler implements Runnable {
private static final Logger serverLogger = Logger.getLogger(JobRunner.class);
private static final Object logCreatorLock = new Object();
- private final Layout DEFAULT_LAYOUT = new EnhancedPatternLayout(
- "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
+
+ private static final String DEFAULT_LAYOUT =
+ "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n";
+
private final Object syncObject = new Object();
private final JobTypeManager jobtypeManager;
private final ExecutorLoader loader;
@@ -72,7 +74,7 @@ public class JobRunner extends EventHandler implements Runnable {
private final Props azkabanProps;
private final ExecutableNode node;
private final File workingDir;
- private final Layout loggerLayout = this.DEFAULT_LAYOUT;
+ private final Layout loggerLayout;
private final String jobId;
private final Set<String> pipelineJobs = new HashSet<>();
private Logger logger = null;
@@ -106,6 +108,10 @@ public class JobRunner extends EventHandler implements Runnable {
this.loader = loader;
this.jobtypeManager = jobtypeManager;
this.azkabanProps = azkabanProps;
+ final String jobLogLayout = props.getString(
+ JobProperties.JOB_LOG_LAYOUT, DEFAULT_LAYOUT);
+
+ this.loggerLayout = new EnhancedPatternLayout(jobLogLayout);
}
public static String createLogFileName(final ExecutableNode node, final int attempt) {
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index dcaa4ba..6c03c4b 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -34,8 +34,15 @@ import azkaban.jobtype.JobTypePluginSet;
import azkaban.spi.EventType;
import azkaban.test.TestUtils;
import azkaban.utils.Props;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.charset.Charset;
+import java.text.SimpleDateFormat;
+import java.util.Date;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
@@ -80,7 +87,7 @@ public class JobRunnerTest {
}
@Test
- public void testBasicRun() {
+ public void testBasicRun() throws IOException {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
final JobRunner runner =
@@ -103,7 +110,11 @@ public class JobRunnerTest {
final File logFile = new File(runner.getLogFilePath());
final Props outputProps = runner.getNode().getOutputProps();
Assert.assertTrue(outputProps != null);
- Assert.assertTrue(logFile.exists());
+ try (final BufferedReader br = getLogReader(logFile)) {
+ final String firstLine = br.readLine();
+ Assert.assertTrue("Unexpected default layout",
+ firstLine.startsWith(new SimpleDateFormat("dd-MM-yyyy").format(new Date())));
+ }
// Verify that user.to.proxy is default to submit user.
Assert.assertEquals(SUBMIT_USER, runner.getProps().get(JobProperties.USER_TO_PROXY));
@@ -113,6 +124,11 @@ public class JobRunnerTest {
.assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
}
+ private BufferedReader getLogReader(File logFile) throws FileNotFoundException {
+ return new BufferedReader(new InputStreamReader(new FileInputStream(logFile),
+ Charset.defaultCharset()));
+ }
+
@Test
public void testFailedRun() {
final MockExecutorLoader loader = new MockExecutorLoader();
@@ -324,8 +340,23 @@ public class JobRunnerTest {
eventCollector.assertEvents(EventType.JOB_FINISHED);
}
- private Props createProps(final int sleepSec, final boolean fail) {
- final Props props = new Props();
+ @Test
+ public void testCustomLogLayout() throws IOException {
+ final MockExecutorLoader loader = new MockExecutorLoader();
+ final EventCollectorListener eventCollector = new EventCollectorListener();
+ final Props azkabanProps = new Props();
+ azkabanProps.put(JobProperties.JOB_LOG_LAYOUT, "TEST %c{1} %p - %m\n");
+ final JobRunner runner =
+ createJobRunner(1, "testJob", 0, false, loader, eventCollector, azkabanProps);
+ runner.run();
+ try (final BufferedReader br = getLogReader(runner.getLogFile())) {
+ final String firstLine = br.readLine();
+ Assert.assertTrue("Unexpected default layout",
+ firstLine.startsWith("TEST"));
+ }
+ }
+
+ private Props createProps(final int sleepSec, final boolean fail, Props props) {
props.put("type", "test");
props.put("seconds", sleepSec);
props.put("fail", String.valueOf(fail));
@@ -334,6 +365,11 @@ public class JobRunnerTest {
private JobRunner createJobRunner(final int execId, final String name, final int time,
final boolean fail, final ExecutorLoader loader, final EventCollectorListener listener) {
+ return createJobRunner(execId, name, time, fail, loader, listener, new Props());
+ }
+
+ private JobRunner createJobRunner(final int execId, final String name, final int time,
+ final boolean fail, final ExecutorLoader loader, final EventCollectorListener listener, Props jobProps) {
final Props azkabanProps = new Props();
final ExecutableFlow flow = new ExecutableFlow();
flow.setExecutionId(execId);
@@ -342,7 +378,7 @@ public class JobRunnerTest {
node.setId(name);
node.setParentFlow(flow);
- final Props props = createProps(time, fail);
+ final Props props = createProps(time, fail, jobProps);
node.setInputProps(props);
final HashSet<String> proxyUsers = new HashSet<>();
proxyUsers.add(flow.getSubmitUser());