azkaban-aplcache

Option to create Kafka Appender for user job logs (#797) *

11/23/2016 10:07:00 PM

Changes

Details

diff --git a/azkaban-common/src/main/java/azkaban/constants/FlowProperties.java b/azkaban-common/src/main/java/azkaban/constants/FlowProperties.java
new file mode 100644
index 0000000..c41b925
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/constants/FlowProperties.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.constants;
+
+public class FlowProperties {
+  // Basic properties of flows as set by the executor server
+  public static final String AZKABAN_FLOW_PROJECT_NAME = "azkaban.flow.projectname";
+  public static final String AZKABAN_FLOW_FLOW_ID = "azkaban.flow.flowid";
+  public static final String AZKABAN_FLOW_SUBMIT_USER = "azkaban.flow.submituser";
+  public static final String AZKABAN_FLOW_EXEC_ID = "azkaban.flow.execid";
+  public static final String AZKABAN_FLOW_PROJECT_VERSION = "azkaban.flow.projectversion";
+}
diff --git a/azkaban-common/src/main/java/azkaban/constants/JobProperties.java b/azkaban-common/src/main/java/azkaban/constants/JobProperties.java
new file mode 100644
index 0000000..17f2c42
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/constants/JobProperties.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.constants;
+
+public class JobProperties {
+  // Job property that enables/disables using Kafka logging of user job logs
+  public static final String AZKABAN_JOB_LOGGING_KAFKA_ENABLE = "azkaban.job.logging.kafka.enable";
+}
diff --git a/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java b/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java
new file mode 100644
index 0000000..7aa56b2
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.constants;
+
+public class ServerProperties {
+  // These properties are configurable through azkaban.properties
+
+  // Configures the Kafka appender for logging user jobs, specified for the exec server
+  public static final String AZKABAN_SERVER_LOGGING_KAFKA_BROKERLIST = "azkaban.server.logging.kafka.brokerList";
+  public static final String AZKABAN_SERVER_LOGGING_KAFKA_TOPIC = "azkaban.server.logging.kafka.topic";
+}
diff --git a/azkaban-common/src/main/java/azkaban/server/AbstractServiceServlet.java b/azkaban-common/src/main/java/azkaban/server/AbstractServiceServlet.java
index 9546fb9..c433cce 100644
--- a/azkaban-common/src/main/java/azkaban/server/AbstractServiceServlet.java
+++ b/azkaban-common/src/main/java/azkaban/server/AbstractServiceServlet.java
@@ -16,6 +16,8 @@
 
 package azkaban.server;
 
+import azkaban.constants.ServerInternals;
+
 import java.io.IOException;
 import java.io.OutputStream;
 
@@ -37,7 +39,7 @@ public class AbstractServiceServlet extends HttpServlet {
   @Override
   public void init(ServletConfig config) throws ServletException {
     application =
-        (AzkabanServer) config.getServletContext().getAttribute(Constants.AZKABAN_SERVLET_CONTEXT_KEY);
+        (AzkabanServer) config.getServletContext().getAttribute(ServerInternals.AZKABAN_SERVLET_CONTEXT_KEY);
 
     if (application == null) {
       throw new IllegalStateException(
diff --git a/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java b/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
index d9ca84b..5f726c0 100644
--- a/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
+++ b/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
@@ -28,6 +28,7 @@ import joptsimple.OptionSpec;
 import org.apache.log4j.Logger;
 import org.apache.velocity.app.VelocityEngine;
 
+import azkaban.constants.ServerInternals;
 import azkaban.user.UserManager;
 import azkaban.utils.Props;
 import azkaban.server.session.SessionCache;
@@ -79,8 +80,8 @@ public abstract class AzkabanServer {
 
   private static Props loadAzkabanConfigurationFromDirectory(File dir) {
     File azkabanPrivatePropsFile =
-        new File(dir, Constants.AZKABAN_PRIVATE_PROPERTIES_FILE);
-    File azkabanPropsFile = new File(dir, Constants.AZKABAN_PROPERTIES_FILE);
+        new File(dir, ServerInternals.AZKABAN_PRIVATE_PROPERTIES_FILE);
+    File azkabanPropsFile = new File(dir, ServerInternals.AZKABAN_PROPERTIES_FILE);
 
     Props props = null;
     try {
@@ -124,7 +125,7 @@ public abstract class AzkabanServer {
       return null;
     }
 
-    File confPath = new File(azkabanHome, Constants.DEFAULT_CONF_PATH);
+    File confPath = new File(azkabanHome, ServerInternals.DEFAULT_CONF_PATH);
     if (!confPath.exists() || !confPath.isDirectory() || !confPath.canRead()) {
       logger
           .error(azkabanHome + " does not contain a readable conf directory.");
diff --git a/azkaban-exec-server/build.gradle b/azkaban-exec-server/build.gradle
index 93ca4f6..68e39ec 100644
--- a/azkaban-exec-server/build.gradle
+++ b/azkaban-exec-server/build.gradle
@@ -2,6 +2,8 @@ apply plugin: 'distribution'
 
 dependencies {
   compile(project(':azkaban-common'))
+  compile('org.apache.kafka:kafka-log4j-appender:0.10.0.0')
+  compile('com.googlecode.json-simple:json-simple:1.1.1')
 
   testCompile('org.hamcrest:hamcrest-all:1.3')
   testCompile(project(':azkaban-common').sourceSets.test.output)
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 5f832d5..21be9a4 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -44,6 +44,7 @@ import javax.management.MBeanInfo;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import azkaban.constants.ServerInternals;
 import azkaban.execapp.event.JobCallbackManager;
 import azkaban.execapp.jmx.JmxFlowRunnerManager;
 import azkaban.execapp.jmx.JmxJobMBeanManager;
@@ -64,12 +65,11 @@ import azkaban.metric.inmemoryemitter.InMemoryMetricEmitter;
 import azkaban.project.JdbcProjectLoader;
 import azkaban.project.ProjectLoader;
 import azkaban.server.AzkabanServer;
-import azkaban.server.Constants;
 import azkaban.utils.Props;
 import azkaban.utils.SystemMemoryInfo;
 import azkaban.utils.Utils;
 
-import static azkaban.server.Constants.AZKABAN_EXECUTOR_PORT_FILENAME;
+import static azkaban.constants.ServerInternals.AZKABAN_EXECUTOR_PORT_FILENAME;
 import static com.google.common.base.Preconditions.checkState;
 import static java.util.Objects.requireNonNull;
 
@@ -171,7 +171,7 @@ public class AzkabanExecutorServer {
     root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
     root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverStatistics");
 
-    root.setAttribute(Constants.AZKABAN_SERVLET_CONTEXT_KEY, this);
+    root.setAttribute(ServerInternals.AZKABAN_SERVLET_CONTEXT_KEY, this);
     return server;
   }
 
@@ -418,7 +418,7 @@ public class AzkabanExecutorServer {
       return null;
     }
 
-    File confPath = new File(azkabanHome, Constants.DEFAULT_CONF_PATH);
+    File confPath = new File(azkabanHome, ServerInternals.DEFAULT_CONF_PATH);
     if (!confPath.exists() || !confPath.isDirectory() || !confPath.canRead()) {
       logger
           .error(azkabanHome + " does not contain a readable conf directory.");
@@ -439,8 +439,8 @@ public class AzkabanExecutorServer {
    */
   private static Props loadAzkabanConfigurationFromDirectory(File dir) {
     File azkabanPrivatePropsFile =
-        new File(dir, Constants.AZKABAN_PRIVATE_PROPERTIES_FILE);
-    File azkabanPropsFile = new File(dir, Constants.AZKABAN_PROPERTIES_FILE);
+        new File(dir, ServerInternals.AZKABAN_PRIVATE_PROPERTIES_FILE);
+    File azkabanPropsFile = new File(dir, ServerInternals.AZKABAN_PROPERTIES_FILE);
 
     Props props = null;
     try {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
index 482d36f..3d618d2 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
@@ -35,12 +35,12 @@ import org.apache.log4j.Logger;
 
 import org.codehaus.jackson.map.ObjectMapper;
 
+import azkaban.constants.ServerInternals;
 import azkaban.executor.ConnectorParams;
 import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.Executor;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
-import azkaban.server.Constants;
 import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.JSONUtils;
@@ -62,7 +62,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
   public void init(ServletConfig config) throws ServletException {
     application =
         (AzkabanExecutorServer) config.getServletContext().getAttribute(
-            Constants.AZKABAN_SERVLET_CONTEXT_KEY);
+            ServerInternals.AZKABAN_SERVLET_CONTEXT_KEY);
 
     if (application == null) {
       throw new IllegalStateException(
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 3e90c13..1edbe80 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -96,6 +96,7 @@ public class FlowRunner extends EventHandler implements Runnable {
   private Object mainSyncObj = new Object();
 
   // Properties map
+  private Props azkabanProps;
   private Map<String, Props> sharedProps = new HashMap<String, Props>();
   private final JobTypeManager jobtypeManager;
 
@@ -137,9 +138,9 @@ public class FlowRunner extends EventHandler implements Runnable {
    * @throws ExecutorManagerException
    */
   public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader,
-      ProjectLoader projectLoader, JobTypeManager jobtypeManager)
+      ProjectLoader projectLoader, JobTypeManager jobtypeManager, Props azkabanProps)
       throws ExecutorManagerException {
-    this(flow, executorLoader, projectLoader, jobtypeManager, null);
+    this(flow, executorLoader, projectLoader, jobtypeManager, null, azkabanProps);
   }
 
   /**
@@ -155,7 +156,7 @@ public class FlowRunner extends EventHandler implements Runnable {
    */
   public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader,
       ProjectLoader projectLoader, JobTypeManager jobtypeManager,
-      ExecutorService executorService) throws ExecutorManagerException {
+      ExecutorService executorService, Props azkabanProps) throws ExecutorManagerException {
     this.execId = flow.getExecutionId();
     this.flow = flow;
     this.executorLoader = executorLoader;
@@ -170,6 +171,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     this.proxyUsers = flow.getProxyUsers();
     this.executorService = executorService;
     this.finishedNodes = new SwapQueue<ExecutableNode>();
+    this.azkabanProps = azkabanProps;
   }
 
   public FlowRunner setFlowWatcher(FlowWatcher watcher) {
@@ -820,7 +822,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 
     JobRunner jobRunner =
         new JobRunner(node, path.getParentFile(), executorLoader,
-            jobtypeManager);
+            jobtypeManager, azkabanProps);
     if (watcher != null) {
       jobRunner.setPipeline(watcher, pipelineLevel);
     }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 01ab373..eed51ad 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -497,7 +497,7 @@ public class FlowRunnerManager implements EventListener,
     }
 
     FlowRunner runner =
-        new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
+        new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager, azkabanProps);
     runner.setFlowWatcher(watcher)
         .setJobLogSettings(jobLogChunkSize, jobLogNumFiles)
         .setValidateProxyUser(validateProxyUser)
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JMXHttpServlet.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JMXHttpServlet.java
index e3d42cd..2141507 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JMXHttpServlet.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JMXHttpServlet.java
@@ -32,9 +32,9 @@ import javax.servlet.http.HttpServletResponse;
 
 import org.apache.log4j.Logger;
 
+import azkaban.constants.ServerInternals;
 import azkaban.executor.ConnectorParams;
 import azkaban.server.HttpRequestUtils;
-import azkaban.server.Constants;
 import azkaban.utils.JSONUtils;
 
 public class JMXHttpServlet extends HttpServlet implements ConnectorParams {
@@ -45,7 +45,7 @@ public class JMXHttpServlet extends HttpServlet implements ConnectorParams {
   public void init(ServletConfig config) throws ServletException {
     server =
         (AzkabanExecutorServer) config.getServletContext().getAttribute(
-            Constants.AZKABAN_SERVLET_CONTEXT_KEY);
+            ServerInternals.AZKABAN_SERVLET_CONTEXT_KEY);
   }
 
   public boolean hasParam(HttpServletRequest request, String param) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index eaa0bbc..dbd5b80 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -24,13 +24,22 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.Optional;
 
 import org.apache.log4j.Appender;
 import org.apache.log4j.EnhancedPatternLayout;
+import org.apache.log4j.FileAppender;
 import org.apache.log4j.Layout;
 import org.apache.log4j.Logger;
 import org.apache.log4j.RollingFileAppender;
 
+import org.apache.kafka.log4jappender.KafkaLog4jAppender;
+
+import org.json.simple.JSONObject;
+
+import azkaban.constants.FlowProperties;
+import azkaban.constants.JobProperties;
+import azkaban.constants.ServerProperties;
 import azkaban.event.Event;
 import azkaban.event.Event.Type;
 import azkaban.event.EventData;
@@ -50,6 +59,7 @@ import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypeManagerException;
 import azkaban.utils.Props;
 import azkaban.utils.StringUtils;
+import azkaban.utils.UndefinedPropertyException;
 
 public class JobRunner extends EventHandler implements Runnable {
   public static final String AZKABAN_WEBSERVER_URL = "azkaban.webserver.url";
@@ -59,6 +69,7 @@ public class JobRunner extends EventHandler implements Runnable {
 
   private ExecutorLoader loader;
   private Props props;
+  private Props azkabanProps;
   private ExecutableNode node;
   private File workingDir;
 
@@ -66,7 +77,8 @@ public class JobRunner extends EventHandler implements Runnable {
   private Layout loggerLayout = DEFAULT_LAYOUT;
   private Logger flowLogger = null;
 
-  private Appender jobAppender;
+  private Appender jobAppender = null;
+  private Optional<Appender> kafkaAppender = Optional.empty();
   private File logFile;
   private String attachmentFileName;
 
@@ -94,7 +106,7 @@ public class JobRunner extends EventHandler implements Runnable {
   private BlockingStatus currentBlockStatus = null;
 
   public JobRunner(ExecutableNode node, File workingDir, ExecutorLoader loader,
-      JobTypeManager jobtypeManager) {
+      JobTypeManager jobtypeManager, Props azkabanProps) {
     this.props = node.getInputProps();
     this.node = node;
     this.workingDir = workingDir;
@@ -103,6 +115,7 @@ public class JobRunner extends EventHandler implements Runnable {
     this.jobId = node.getId();
     this.loader = loader;
     this.jobtypeManager = jobtypeManager;
+    this.azkabanProps = azkabanProps;
   }
 
   public void setValidatedProxyUsers(Set<String> proxyUsers) {
@@ -210,38 +223,116 @@ public class JobRunner extends EventHandler implements Runnable {
               + this.jobId;
       logger = Logger.getLogger(loggerName);
 
-      // Create file appender
-      String logName = createLogFileName(node);
-      logFile = new File(workingDir, logName);
-
-      String absolutePath = logFile.getAbsolutePath();
-
-      jobAppender = null;
       try {
-        RollingFileAppender fileAppender =
-            new RollingFileAppender(loggerLayout, absolutePath, true);
-        fileAppender.setMaxBackupIndex(jobLogBackupIndex);
-        fileAppender.setMaxFileSize(jobLogChunkSize);
-        jobAppender = fileAppender;
-        logger.addAppender(jobAppender);
-        logger.setAdditivity(false);
+        attachFileAppender(createFileAppender());
       } catch (IOException e) {
+        removeAppender(jobAppender);
         flowLogger.error("Could not open log file in " + workingDir
             + " for job " + this.jobId, e);
       }
+
+      if (props.getBoolean(JobProperties.AZKABAN_JOB_LOGGING_KAFKA_ENABLE, false)) {
+        // Only attempt appender construction if required properties are present
+        if (azkabanProps.containsKey(ServerProperties.AZKABAN_SERVER_LOGGING_KAFKA_BROKERLIST)
+            && azkabanProps.containsKey(ServerProperties.AZKABAN_SERVER_LOGGING_KAFKA_TOPIC)) {
+          try {
+            attachKafkaAppender(createKafkaAppender());
+          } catch (Exception e) {
+            removeAppender(kafkaAppender);
+            flowLogger.error("Failed to create Kafka appender for job " + this.jobId, e);
+          }
+        } else {
+          flowLogger.info("Kafka appender not created as brokerlist or topic not provided by executor server");
+        }
+      }
     }
   }
 
+  private void attachFileAppender(FileAppender appender) {
+    // If present, remove the existing file appender
+    assert(jobAppender == null);
+
+    jobAppender = appender;
+    logger.addAppender(jobAppender);
+    logger.setAdditivity(false);
+    flowLogger.info("Attached file appender for job " + this.jobId);
+  }
+
+  private FileAppender createFileAppender() throws IOException {
+    // Set up log files
+    String logName = createLogFileName(node);
+    logFile = new File(workingDir, logName);
+    String absolutePath = logFile.getAbsolutePath();
+
+    // Attempt to create FileAppender
+    RollingFileAppender fileAppender =
+        new RollingFileAppender(loggerLayout, absolutePath, true);
+    fileAppender.setMaxBackupIndex(jobLogBackupIndex);
+    fileAppender.setMaxFileSize(jobLogChunkSize);
+
+    flowLogger.info("Created file appender for job " + this.jobId);
+    return fileAppender;
+  }
+
   private void createAttachmentFile() {
     String fileName = createAttachmentFileName(node);
     File file = new File(workingDir, fileName);
     attachmentFileName = file.getAbsolutePath();
   }
 
+  private void attachKafkaAppender(KafkaLog4jAppender appender) {
+    // This should only be called once
+    assert(!kafkaAppender.isPresent());
+
+    kafkaAppender = Optional.of(appender);
+    logger.addAppender(kafkaAppender.get());
+    logger.setAdditivity(false);
+    flowLogger.info("Attached new Kafka appender for job " + this.jobId);
+  }
+
+  private KafkaLog4jAppender createKafkaAppender() throws UndefinedPropertyException {
+    KafkaLog4jAppender kafkaProducer = new KafkaLog4jAppender();
+    kafkaProducer.setSyncSend(false);
+    kafkaProducer.setBrokerList(azkabanProps.getString(ServerProperties.AZKABAN_SERVER_LOGGING_KAFKA_BROKERLIST));
+    kafkaProducer.setTopic(azkabanProps.getString(ServerProperties.AZKABAN_SERVER_LOGGING_KAFKA_TOPIC));
+
+    JSONObject layout = new JSONObject();
+    layout.put("category", "%c{1}");
+    layout.put("level", "%p");
+    layout.put("message", "%m");
+    layout.put("projectname", props.getString(FlowProperties.AZKABAN_FLOW_PROJECT_NAME));
+    layout.put("flowid", props.getString(FlowProperties.AZKABAN_FLOW_FLOW_ID));
+    layout.put("submituser", props.getString(FlowProperties.AZKABAN_FLOW_SUBMIT_USER));
+    layout.put("execid", props.getString(FlowProperties.AZKABAN_FLOW_EXEC_ID));
+    layout.put("projectversion", props.getString(FlowProperties.AZKABAN_FLOW_PROJECT_VERSION));
+    layout.put("logsource", "userJob");
+
+    kafkaProducer.setLayout(new EnhancedPatternLayout(layout.toString()));
+    kafkaProducer.activateOptions();
+
+    flowLogger.info("Created kafka appender for " + this.jobId);
+    return kafkaProducer;
+  }
+
+  private void removeAppender(Optional<Appender> appender) {
+    if (appender.isPresent()) {
+      removeAppender(appender.get());
+    }
+  }
+
+  private void removeAppender(Appender appender) {
+    if (appender != null) {
+      logger.removeAppender(appender);
+      appender.close();
+    }
+  }
+
   private void closeLogger() {
     if (jobAppender != null) {
-      logger.removeAppender(jobAppender);
-      jobAppender.close();
+      removeAppender(jobAppender);
+    }
+    if (kafkaAppender.isPresent()) {
+      removeAppender(kafkaAppender);
     }
   }
 
@@ -556,8 +647,7 @@ public class JobRunner extends EventHandler implements Runnable {
    * know what executions initiated their execution.
    */
   private void insertJobMetadata() {
-    Props azkProps = AzkabanExecutorServer.getApp().getAzkabanProps();
-    String baseURL = azkProps.get(AZKABAN_WEBSERVER_URL);
+    String baseURL = azkabanProps.get(AZKABAN_WEBSERVER_URL);
     if (baseURL != null) {
       String flowName = node.getParentFlow().getFlowId();
       String projectName = node.getParentFlow().getProjectName();
@@ -754,4 +844,4 @@ public class JobRunner extends EventHandler implements Runnable {
     return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId
         + ".attach" : "_job." + executionId + "." + jobId + ".attach";
   }
-}
\ No newline at end of file
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
index a75f9b6..0570435 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
@@ -43,6 +43,7 @@ import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
 import azkaban.project.MockProjectLoader;
 import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
 
 public class LocalFlowWatcherTest {
   private File workingDir;
@@ -237,6 +238,12 @@ public class LocalFlowWatcherTest {
   private FlowRunner createFlowRunner(File workingDir, ExecutorLoader loader,
       EventCollectorListener eventCollector, String flowName, int execId,
       FlowWatcher watcher, Integer pipeline) throws Exception {
+    return createFlowRunner(workingDir, loader, eventCollector, flowName, execId, watcher, pipeline, new Props());
+  }
+
+  private FlowRunner createFlowRunner(File workingDir, ExecutorLoader loader,
+      EventCollectorListener eventCollector, String flowName, int execId,
+      FlowWatcher watcher, Integer pipeline, Props azkabanProps) throws Exception {
     File testDir = new File("unit/executions/exectest1");
     ExecutableFlow exFlow =
         prepareExecDir(workingDir, testDir, flowName, execId);
@@ -250,7 +257,7 @@ public class LocalFlowWatcherTest {
 
     loader.uploadExecutableFlow(exFlow);
     FlowRunner runner =
-        new FlowRunner(exFlow, loader, fakeProjectLoader, jobtypeManager);
+        new FlowRunner(exFlow, loader, fakeProjectLoader, jobtypeManager, azkabanProps);
     runner.setFlowWatcher(watcher);
     runner.addListener(eventCollector);
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
index 4cbc794..2b7197c 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
@@ -44,6 +44,7 @@ import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
 import azkaban.project.MockProjectLoader;
 import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
 
 public class RemoteFlowWatcherTest {
   private File workingDir;
@@ -238,6 +239,12 @@ public class RemoteFlowWatcherTest {
   private FlowRunner createFlowRunner(File workingDir, ExecutorLoader loader,
       EventCollectorListener eventCollector, String flowName, int execId,
       FlowWatcher watcher, Integer pipeline) throws Exception {
+    return createFlowRunner(workingDir, loader, eventCollector, flowName, execId, watcher, pipeline, new Props());
+  }
+
+  private FlowRunner createFlowRunner(File workingDir, ExecutorLoader loader,
+      EventCollectorListener eventCollector, String flowName, int execId,
+      FlowWatcher watcher, Integer pipeline, Props azkabanProps) throws Exception {
     File testDir = new File("unit/executions/exectest1");
     ExecutableFlow exFlow =
         prepareExecDir(workingDir, testDir, flowName, execId);
@@ -251,7 +258,7 @@ public class RemoteFlowWatcherTest {
 
     loader.uploadExecutableFlow(exFlow);
     FlowRunner runner =
-        new FlowRunner(exFlow, loader, fakeProjectLoader, jobtypeManager);
+        new FlowRunner(exFlow, loader, fakeProjectLoader, jobtypeManager, azkabanProps);
     runner.setFlowWatcher(watcher);
     runner.addListener(eventCollector);
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
index e29e973..c53cf99 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
@@ -677,11 +677,17 @@ public class FlowRunnerPipelineTest {
   private FlowRunner createFlowRunner(EventCollectorListener eventCollector,
       String flowName, String groupName) throws Exception {
     return createFlowRunner(eventCollector, flowName, groupName,
-        new ExecutionOptions());
+        new ExecutionOptions(), new Props());
   }
 
   private FlowRunner createFlowRunner(EventCollectorListener eventCollector,
-      String flowName, String groupName, ExecutionOptions options)
+      String flowName, String groupName, ExecutionOptions options) throws Exception {
+    return createFlowRunner(eventCollector, flowName, groupName,
+        options, new Props());
+  }
+
+  private FlowRunner createFlowRunner(EventCollectorListener eventCollector,
+      String flowName, String groupName, ExecutionOptions options, Props azkabanProps)
       throws Exception {
     Flow flow = flowMap.get(flowName);
 
@@ -698,7 +704,7 @@ public class FlowRunnerPipelineTest {
 
     FlowRunner runner =
         new FlowRunner(fakeExecutorLoader.fetchExecutableFlow(exId),
-            fakeExecutorLoader, fakeProjectLoader, jobtypeManager);
+            fakeExecutorLoader, fakeProjectLoader, jobtypeManager, azkabanProps);
 
     runner.addListener(eventCollector);
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
index 422f622..f79f2fe 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
@@ -226,6 +226,11 @@ public class FlowRunnerPropertyResolutionTest {
 
   private FlowRunner createFlowRunner(String flowName,
       HashMap<String, String> flowParams) throws Exception {
+    return createFlowRunner(flowName, flowParams, new Props());
+  }
+
+  private FlowRunner createFlowRunner(String flowName,
+      HashMap<String, String> flowParams, Props azkabanProps) throws Exception {
     Flow flow = flowMap.get(flowName);
 
     int exId = id++;
@@ -238,7 +243,7 @@ public class FlowRunnerPropertyResolutionTest {
 
     FlowRunner runner =
         new FlowRunner(fakeExecutorLoader.fetchExecutableFlow(exId),
-            fakeExecutorLoader, fakeProjectLoader, jobtypeManager);
+            fakeExecutorLoader, fakeProjectLoader, jobtypeManager, azkabanProps);
     return runner;
   }
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
index 8b2c6d4..c4289e4 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
@@ -45,6 +45,7 @@ import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
 import azkaban.project.MockProjectLoader;
 import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
 
 public class FlowRunnerTest {
   private File workingDir;
@@ -455,7 +456,12 @@ public class FlowRunnerTest {
   }
 
   private FlowRunner createFlowRunner(ExecutableFlow flow,
-      ExecutorLoader loader, EventCollectorListener eventCollector)
+      ExecutorLoader loader, EventCollectorListener eventCollector) throws Exception {
+    return createFlowRunner(flow, loader, eventCollector, new Props());
+  }
+
+  private FlowRunner createFlowRunner(ExecutableFlow flow,
+      ExecutorLoader loader, EventCollectorListener eventCollector, Props azkabanProps)
       throws Exception {
     // File testDir = new File("unit/executions/exectest1");
     // MockProjectLoader projectLoader = new MockProjectLoader(new
@@ -463,7 +469,7 @@ public class FlowRunnerTest {
 
     loader.uploadExecutableFlow(flow);
     FlowRunner runner =
-        new FlowRunner(flow, loader, fakeProjectLoader, jobtypeManager);
+        new FlowRunner(flow, loader, fakeProjectLoader, jobtypeManager, azkabanProps);
 
     runner.addListener(eventCollector);
 
@@ -472,6 +478,11 @@ public class FlowRunnerTest {
 
   private FlowRunner createFlowRunner(ExecutorLoader loader,
       EventCollectorListener eventCollector, String flowName) throws Exception {
+    return createFlowRunner(loader, eventCollector, flowName, new Props());
+  }
+
+  private FlowRunner createFlowRunner(ExecutorLoader loader,
+      EventCollectorListener eventCollector, String flowName, Props azkabanProps) throws Exception {
     File testDir = new File("unit/executions/exectest1");
     ExecutableFlow exFlow = prepareExecDir(testDir, flowName, 1);
     // MockProjectLoader projectLoader = new MockProjectLoader(new
@@ -480,7 +491,7 @@ public class FlowRunnerTest {
     loader.uploadExecutableFlow(exFlow);
 
     FlowRunner runner =
-        new FlowRunner(exFlow, loader, fakeProjectLoader, jobtypeManager);
+        new FlowRunner(exFlow, loader, fakeProjectLoader, jobtypeManager, azkabanProps);
 
     runner.addListener(eventCollector);
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
index 84416de..196fcd7 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
@@ -1403,6 +1403,11 @@ public class FlowRunnerTest2 {
 
   private FlowRunner createFlowRunner(EventCollectorListener eventCollector,
       String flowName, FailureAction action) throws Exception {
+    return createFlowRunner(eventCollector, flowName, action, new Props());
+  }
+
+  private FlowRunner createFlowRunner(EventCollectorListener eventCollector,
+      String flowName, FailureAction action, Props azkabanProps) throws Exception {
     Flow flow = flowMap.get(flowName);
 
     int exId = id++;
@@ -1420,7 +1425,7 @@ public class FlowRunnerTest2 {
 
     FlowRunner runner = new FlowRunner(
         fakeExecutorLoader.fetchExecutableFlow(exId), fakeExecutorLoader,
-        fakeProjectLoader, jobtypeManager);
+        fakeProjectLoader, jobtypeManager, azkabanProps);
 
     runner.addListener(eventCollector);
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index 95406f5..479441a 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -381,6 +381,11 @@ public class JobRunnerTest {
 
   private JobRunner createJobRunner(int execId, String name, int time,
       boolean fail, ExecutorLoader loader, EventCollectorListener listener) {
+    return createJobRunner(execId, name, time, fail, loader, listener, new Props());
+  }
+
+  private JobRunner createJobRunner(int execId, String name, int time,
+      boolean fail, ExecutorLoader loader, EventCollectorListener listener, Props azkabanProps) {
     ExecutableFlow flow = new ExecutableFlow();
     flow.setExecutionId(execId);
     ExecutableNode node = new ExecutableNode();
@@ -391,7 +396,7 @@ public class JobRunnerTest {
     node.setInputProps(props);
     HashSet<String> proxyUsers = new HashSet<String>();
     proxyUsers.add(flow.getSubmitUser());
-    JobRunner runner = new JobRunner(node, workingDir, loader, jobtypeManager);
+    JobRunner runner = new JobRunner(node, workingDir, loader, jobtypeManager, azkabanProps);
     runner.setLogSettings(logger, "5MB", 4);
 
     runner.addListener(listener);
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 243512c..e490a7d 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -55,6 +55,7 @@ import org.mortbay.jetty.servlet.ServletHolder;
 import org.mortbay.thread.QueuedThreadPool;
 
 import azkaban.alert.Alerter;
+import azkaban.constants.ServerInternals;
 import azkaban.database.AzkabanDatabaseSetup;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.JdbcExecutorLoader;
@@ -67,7 +68,6 @@ import azkaban.scheduler.ScheduleLoader;
 import azkaban.scheduler.ScheduleManager;
 import azkaban.scheduler.TriggerBasedScheduleLoader;
 import azkaban.server.AzkabanServer;
-import azkaban.server.Constants;
 import azkaban.server.session.SessionCache;
 import azkaban.trigger.JdbcTriggerLoader;
 import azkaban.trigger.TriggerLoader;
@@ -804,7 +804,7 @@ public class AzkabanWebServer extends AzkabanServer {
     // TODO: find something else to do the job
     app.getTriggerManager().start();
 
-    root.setAttribute(Constants.AZKABAN_SERVLET_CONTEXT_KEY, app);
+    root.setAttribute(ServerInternals.AZKABAN_SERVLET_CONTEXT_KEY, app);
     try {
       server.start();
     } catch (Exception e) {