diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index 2ee9d67..c91cfe5 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -345,26 +345,31 @@ public class JobRunner extends EventHandler implements Runnable {
this.azkabanProps
.getString(Constants.ConfigurationKeys.AZKABAN_SERVER_LOGGING_KAFKA_TOPIC));
+ final JSONObject layout = createLogPatternLayoutJsonObject(props, jobId);
+
+ kafkaProducer.setLayout(new PatternLayoutEscaped(layout.toString()));
+ kafkaProducer.activateOptions();
+
+ this.flowLogger.info("Created kafka appender for " + this.jobId);
+ return kafkaProducer;
+ }
+
+ private static JSONObject createLogPatternLayoutJsonObject(Props props, String jobId) {
final JSONObject layout = new JSONObject();
layout.put("category", "%c{1}");
layout.put("level", "%p");
layout.put("message", "%m");
layout.put("projectname",
- this.props.getString(Constants.FlowProperties.AZKABAN_FLOW_PROJECT_NAME));
- layout.put("flowid", this.props.getString(Constants.FlowProperties.AZKABAN_FLOW_FLOW_ID));
- layout.put("jobid", this.jobId);
+ props.getString(Constants.FlowProperties.AZKABAN_FLOW_PROJECT_NAME));
+ layout.put("flowid", props.getString(Constants.FlowProperties.AZKABAN_FLOW_FLOW_ID));
+ layout.put("jobid", jobId);
layout
- .put("submituser", this.props.getString(Constants.FlowProperties.AZKABAN_FLOW_SUBMIT_USER));
- layout.put("execid", this.props.getString(Constants.FlowProperties.AZKABAN_FLOW_EXEC_ID));
+ .put("submituser", props.getString(Constants.FlowProperties.AZKABAN_FLOW_SUBMIT_USER));
+ layout.put("execid", props.getString(Constants.FlowProperties.AZKABAN_FLOW_EXEC_ID));
layout.put("projectversion",
- this.props.getString(Constants.FlowProperties.AZKABAN_FLOW_PROJECT_VERSION));
+ props.getString(Constants.FlowProperties.AZKABAN_FLOW_PROJECT_VERSION));
layout.put("logsource", "userJob");
-
- kafkaProducer.setLayout(new PatternLayoutEscaped(layout.toString()));
- kafkaProducer.activateOptions();
-
- this.flowLogger.info("Created kafka appender for " + this.jobId);
- return kafkaProducer;
+ return layout;
}
private void removeAppender(final Optional<Appender> appender) {