azkaban-aplcache
Changes
azkaban-exec-server/build.gradle 2(+2 -0)
Details
diff --git a/azkaban-common/src/main/java/azkaban/constants/FlowProperties.java b/azkaban-common/src/main/java/azkaban/constants/FlowProperties.java
new file mode 100644
index 0000000..c41b925
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/constants/FlowProperties.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.constants;
+
+public class FlowProperties {
+ // Basic properties of flows as set by the executor server
+ public static final String AZKABAN_FLOW_PROJECT_NAME = "azkaban.flow.projectname";
+ public static final String AZKABAN_FLOW_FLOW_ID = "azkaban.flow.flowid";
+ public static final String AZKABAN_FLOW_SUBMIT_USER = "azkaban.flow.submituser";
+ public static final String AZKABAN_FLOW_EXEC_ID = "azkaban.flow.execid";
+ public static final String AZKABAN_FLOW_PROJECT_VERSION = "azkaban.flow.projectversion";
+}
diff --git a/azkaban-common/src/main/java/azkaban/constants/JobProperties.java b/azkaban-common/src/main/java/azkaban/constants/JobProperties.java
new file mode 100644
index 0000000..17f2c42
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/constants/JobProperties.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.constants;
+
+public class JobProperties {
+ // Job property that enables/disables using Kafka logging of user job logs
+ public static final String AZKABAN_JOB_LOGGING_KAFKA_ENABLE = "azkaban.job.logging.kafka.enable";
+}
diff --git a/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java b/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java
new file mode 100644
index 0000000..7aa56b2
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.constants;
+
+public class ServerProperties {
+ // These properties are configurable through azkaban.properties
+
+ // Configures the Kafka appender for logging user jobs, specified for the exec server
+ public static final String AZKABAN_SERVER_LOGGING_KAFKA_BROKERLIST = "azkaban.server.logging.kafka.brokerList";
+ public static final String AZKABAN_SERVER_LOGGING_KAFKA_TOPIC = "azkaban.server.logging.kafka.topic";
+}
diff --git a/azkaban-common/src/main/java/azkaban/server/AbstractServiceServlet.java b/azkaban-common/src/main/java/azkaban/server/AbstractServiceServlet.java
index 9546fb9..c433cce 100644
--- a/azkaban-common/src/main/java/azkaban/server/AbstractServiceServlet.java
+++ b/azkaban-common/src/main/java/azkaban/server/AbstractServiceServlet.java
@@ -16,6 +16,8 @@
package azkaban.server;
+import azkaban.constants.ServerInternals;
+
import java.io.IOException;
import java.io.OutputStream;
@@ -37,7 +39,7 @@ public class AbstractServiceServlet extends HttpServlet {
@Override
public void init(ServletConfig config) throws ServletException {
application =
- (AzkabanServer) config.getServletContext().getAttribute(Constants.AZKABAN_SERVLET_CONTEXT_KEY);
+ (AzkabanServer) config.getServletContext().getAttribute(ServerInternals.AZKABAN_SERVLET_CONTEXT_KEY);
if (application == null) {
throw new IllegalStateException(
diff --git a/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java b/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
index d9ca84b..5f726c0 100644
--- a/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
+++ b/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
@@ -28,6 +28,7 @@ import joptsimple.OptionSpec;
import org.apache.log4j.Logger;
import org.apache.velocity.app.VelocityEngine;
+import azkaban.constants.ServerInternals;
import azkaban.user.UserManager;
import azkaban.utils.Props;
import azkaban.server.session.SessionCache;
@@ -79,8 +80,8 @@ public abstract class AzkabanServer {
private static Props loadAzkabanConfigurationFromDirectory(File dir) {
File azkabanPrivatePropsFile =
- new File(dir, Constants.AZKABAN_PRIVATE_PROPERTIES_FILE);
- File azkabanPropsFile = new File(dir, Constants.AZKABAN_PROPERTIES_FILE);
+ new File(dir, ServerInternals.AZKABAN_PRIVATE_PROPERTIES_FILE);
+ File azkabanPropsFile = new File(dir, ServerInternals.AZKABAN_PROPERTIES_FILE);
Props props = null;
try {
@@ -124,7 +125,7 @@ public abstract class AzkabanServer {
return null;
}
- File confPath = new File(azkabanHome, Constants.DEFAULT_CONF_PATH);
+ File confPath = new File(azkabanHome, ServerInternals.DEFAULT_CONF_PATH);
if (!confPath.exists() || !confPath.isDirectory() || !confPath.canRead()) {
logger
.error(azkabanHome + " does not contain a readable conf directory.");
azkaban-exec-server/build.gradle 2(+2 -0)
diff --git a/azkaban-exec-server/build.gradle b/azkaban-exec-server/build.gradle
index 93ca4f6..68e39ec 100644
--- a/azkaban-exec-server/build.gradle
+++ b/azkaban-exec-server/build.gradle
@@ -2,6 +2,8 @@ apply plugin: 'distribution'
dependencies {
compile(project(':azkaban-common'))
+ compile('org.apache.kafka:kafka-log4j-appender:0.10.0.0')
+ compile('com.googlecode.json-simple:json-simple:1.1.1')
testCompile('org.hamcrest:hamcrest-all:1.3')
testCompile(project(':azkaban-common').sourceSets.test.output)
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 5f832d5..21be9a4 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -44,6 +44,7 @@ import javax.management.MBeanInfo;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import azkaban.constants.ServerInternals;
import azkaban.execapp.event.JobCallbackManager;
import azkaban.execapp.jmx.JmxFlowRunnerManager;
import azkaban.execapp.jmx.JmxJobMBeanManager;
@@ -64,12 +65,11 @@ import azkaban.metric.inmemoryemitter.InMemoryMetricEmitter;
import azkaban.project.JdbcProjectLoader;
import azkaban.project.ProjectLoader;
import azkaban.server.AzkabanServer;
-import azkaban.server.Constants;
import azkaban.utils.Props;
import azkaban.utils.SystemMemoryInfo;
import azkaban.utils.Utils;
-import static azkaban.server.Constants.AZKABAN_EXECUTOR_PORT_FILENAME;
+import static azkaban.constants.ServerInternals.AZKABAN_EXECUTOR_PORT_FILENAME;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
@@ -171,7 +171,7 @@ public class AzkabanExecutorServer {
root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverStatistics");
- root.setAttribute(Constants.AZKABAN_SERVLET_CONTEXT_KEY, this);
+ root.setAttribute(ServerInternals.AZKABAN_SERVLET_CONTEXT_KEY, this);
return server;
}
@@ -418,7 +418,7 @@ public class AzkabanExecutorServer {
return null;
}
- File confPath = new File(azkabanHome, Constants.DEFAULT_CONF_PATH);
+ File confPath = new File(azkabanHome, ServerInternals.DEFAULT_CONF_PATH);
if (!confPath.exists() || !confPath.isDirectory() || !confPath.canRead()) {
logger
.error(azkabanHome + " does not contain a readable conf directory.");
@@ -439,8 +439,8 @@ public class AzkabanExecutorServer {
*/
private static Props loadAzkabanConfigurationFromDirectory(File dir) {
File azkabanPrivatePropsFile =
- new File(dir, Constants.AZKABAN_PRIVATE_PROPERTIES_FILE);
- File azkabanPropsFile = new File(dir, Constants.AZKABAN_PROPERTIES_FILE);
+ new File(dir, ServerInternals.AZKABAN_PRIVATE_PROPERTIES_FILE);
+ File azkabanPropsFile = new File(dir, ServerInternals.AZKABAN_PROPERTIES_FILE);
Props props = null;
try {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
index 482d36f..3d618d2 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
@@ -35,12 +35,12 @@ import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
+import azkaban.constants.ServerInternals;
import azkaban.executor.ConnectorParams;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.Executor;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
-import azkaban.server.Constants;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
@@ -62,7 +62,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
public void init(ServletConfig config) throws ServletException {
application =
(AzkabanExecutorServer) config.getServletContext().getAttribute(
- Constants.AZKABAN_SERVLET_CONTEXT_KEY);
+ ServerInternals.AZKABAN_SERVLET_CONTEXT_KEY);
if (application == null) {
throw new IllegalStateException(
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 3e90c13..1edbe80 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -96,6 +96,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private Object mainSyncObj = new Object();
// Properties map
+ private Props azkabanProps;
private Map<String, Props> sharedProps = new HashMap<String, Props>();
private final JobTypeManager jobtypeManager;
@@ -137,9 +138,9 @@ public class FlowRunner extends EventHandler implements Runnable {
* @throws ExecutorManagerException
*/
public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader,
- ProjectLoader projectLoader, JobTypeManager jobtypeManager)
+ ProjectLoader projectLoader, JobTypeManager jobtypeManager, Props azkabanProps)
throws ExecutorManagerException {
- this(flow, executorLoader, projectLoader, jobtypeManager, null);
+ this(flow, executorLoader, projectLoader, jobtypeManager, null, azkabanProps);
}
/**
@@ -155,7 +156,7 @@ public class FlowRunner extends EventHandler implements Runnable {
*/
public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader,
ProjectLoader projectLoader, JobTypeManager jobtypeManager,
- ExecutorService executorService) throws ExecutorManagerException {
+ ExecutorService executorService, Props azkabanProps) throws ExecutorManagerException {
this.execId = flow.getExecutionId();
this.flow = flow;
this.executorLoader = executorLoader;
@@ -170,6 +171,7 @@ public class FlowRunner extends EventHandler implements Runnable {
this.proxyUsers = flow.getProxyUsers();
this.executorService = executorService;
this.finishedNodes = new SwapQueue<ExecutableNode>();
+ this.azkabanProps = azkabanProps;
}
public FlowRunner setFlowWatcher(FlowWatcher watcher) {
@@ -820,7 +822,7 @@ public class FlowRunner extends EventHandler implements Runnable {
JobRunner jobRunner =
new JobRunner(node, path.getParentFile(), executorLoader,
- jobtypeManager);
+ jobtypeManager, azkabanProps);
if (watcher != null) {
jobRunner.setPipeline(watcher, pipelineLevel);
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 01ab373..eed51ad 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -497,7 +497,7 @@ public class FlowRunnerManager implements EventListener,
}
FlowRunner runner =
- new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
+ new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager, azkabanProps);
runner.setFlowWatcher(watcher)
.setJobLogSettings(jobLogChunkSize, jobLogNumFiles)
.setValidateProxyUser(validateProxyUser)
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JMXHttpServlet.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JMXHttpServlet.java
index e3d42cd..2141507 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JMXHttpServlet.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JMXHttpServlet.java
@@ -32,9 +32,9 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.log4j.Logger;
+import azkaban.constants.ServerInternals;
import azkaban.executor.ConnectorParams;
import azkaban.server.HttpRequestUtils;
-import azkaban.server.Constants;
import azkaban.utils.JSONUtils;
public class JMXHttpServlet extends HttpServlet implements ConnectorParams {
@@ -45,7 +45,7 @@ public class JMXHttpServlet extends HttpServlet implements ConnectorParams {
public void init(ServletConfig config) throws ServletException {
server =
(AzkabanExecutorServer) config.getServletContext().getAttribute(
- Constants.AZKABAN_SERVLET_CONTEXT_KEY);
+ ServerInternals.AZKABAN_SERVLET_CONTEXT_KEY);
}
public boolean hasParam(HttpServletRequest request, String param) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index eaa0bbc..dbd5b80 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -24,13 +24,22 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import java.util.Optional;
import org.apache.log4j.Appender;
import org.apache.log4j.EnhancedPatternLayout;
+import org.apache.log4j.FileAppender;
import org.apache.log4j.Layout;
import org.apache.log4j.Logger;
import org.apache.log4j.RollingFileAppender;
+import org.apache.kafka.log4jappender.KafkaLog4jAppender;
+
+import org.json.simple.JSONObject;
+
+import azkaban.constants.FlowProperties;
+import azkaban.constants.JobProperties;
+import azkaban.constants.ServerProperties;
import azkaban.event.Event;
import azkaban.event.Event.Type;
import azkaban.event.EventData;
@@ -50,6 +59,7 @@ import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypeManagerException;
import azkaban.utils.Props;
import azkaban.utils.StringUtils;
+import azkaban.utils.UndefinedPropertyException;
public class JobRunner extends EventHandler implements Runnable {
public static final String AZKABAN_WEBSERVER_URL = "azkaban.webserver.url";
@@ -59,6 +69,7 @@ public class JobRunner extends EventHandler implements Runnable {
private ExecutorLoader loader;
private Props props;
+ private Props azkabanProps;
private ExecutableNode node;
private File workingDir;
@@ -66,7 +77,8 @@ public class JobRunner extends EventHandler implements Runnable {
private Layout loggerLayout = DEFAULT_LAYOUT;
private Logger flowLogger = null;
- private Appender jobAppender;
+ private Appender jobAppender = null;
+ private Optional<Appender> kafkaAppender = Optional.empty();
private File logFile;
private String attachmentFileName;
@@ -94,7 +106,7 @@ public class JobRunner extends EventHandler implements Runnable {
private BlockingStatus currentBlockStatus = null;
public JobRunner(ExecutableNode node, File workingDir, ExecutorLoader loader,
- JobTypeManager jobtypeManager) {
+ JobTypeManager jobtypeManager, Props azkabanProps) {
this.props = node.getInputProps();
this.node = node;
this.workingDir = workingDir;
@@ -103,6 +115,7 @@ public class JobRunner extends EventHandler implements Runnable {
this.jobId = node.getId();
this.loader = loader;
this.jobtypeManager = jobtypeManager;
+ this.azkabanProps = azkabanProps;
}
public void setValidatedProxyUsers(Set<String> proxyUsers) {
@@ -210,38 +223,116 @@ public class JobRunner extends EventHandler implements Runnable {
+ this.jobId;
logger = Logger.getLogger(loggerName);
- // Create file appender
- String logName = createLogFileName(node);
- logFile = new File(workingDir, logName);
-
- String absolutePath = logFile.getAbsolutePath();
-
- jobAppender = null;
try {
- RollingFileAppender fileAppender =
- new RollingFileAppender(loggerLayout, absolutePath, true);
- fileAppender.setMaxBackupIndex(jobLogBackupIndex);
- fileAppender.setMaxFileSize(jobLogChunkSize);
- jobAppender = fileAppender;
- logger.addAppender(jobAppender);
- logger.setAdditivity(false);
+ attachFileAppender(createFileAppender());
} catch (IOException e) {
+ removeAppender(jobAppender);
flowLogger.error("Could not open log file in " + workingDir
+ " for job " + this.jobId, e);
}
+
+ if (props.getBoolean(JobProperties.AZKABAN_JOB_LOGGING_KAFKA_ENABLE, false)) {
+ // Only attempt appender construction if required properties are present
+ if (azkabanProps.containsKey(ServerProperties.AZKABAN_SERVER_LOGGING_KAFKA_BROKERLIST)
+ && azkabanProps.containsKey(ServerProperties.AZKABAN_SERVER_LOGGING_KAFKA_TOPIC)) {
+ try {
+ attachKafkaAppender(createKafkaAppender());
+ } catch (Exception e) {
+ removeAppender(kafkaAppender);
+ flowLogger.error("Failed to create Kafka appender for job " + this.jobId, e);
+ }
+ } else {
+ flowLogger.info("Kafka appender not created as brokerlist or topic not provided by executor server");
+ }
+ }
}
}
+ private void attachFileAppender(FileAppender appender) {
+ // If present, remove the existing file appender
+ assert(jobAppender == null);
+
+ jobAppender = appender;
+ logger.addAppender(jobAppender);
+ logger.setAdditivity(false);
+ flowLogger.info("Attached file appender for job " + this.jobId);
+ }
+
+ private FileAppender createFileAppender() throws IOException {
+ // Set up log files
+ String logName = createLogFileName(node);
+ logFile = new File(workingDir, logName);
+ String absolutePath = logFile.getAbsolutePath();
+
+ // Attempt to create FileAppender
+ RollingFileAppender fileAppender =
+ new RollingFileAppender(loggerLayout, absolutePath, true);
+ fileAppender.setMaxBackupIndex(jobLogBackupIndex);
+ fileAppender.setMaxFileSize(jobLogChunkSize);
+
+ flowLogger.info("Created file appender for job " + this.jobId);
+ return fileAppender;
+ }
+
private void createAttachmentFile() {
String fileName = createAttachmentFileName(node);
File file = new File(workingDir, fileName);
attachmentFileName = file.getAbsolutePath();
}
+ private void attachKafkaAppender(KafkaLog4jAppender appender) {
+ // This should only be called once
+ assert(!kafkaAppender.isPresent());
+
+ kafkaAppender = Optional.of(appender);
+ logger.addAppender(kafkaAppender.get());
+ logger.setAdditivity(false);
+ flowLogger.info("Attached new Kafka appender for job " + this.jobId);
+ }
+
+ private KafkaLog4jAppender createKafkaAppender() throws UndefinedPropertyException {
+ KafkaLog4jAppender kafkaProducer = new KafkaLog4jAppender();
+ kafkaProducer.setSyncSend(false);
+ kafkaProducer.setBrokerList(azkabanProps.getString(ServerProperties.AZKABAN_SERVER_LOGGING_KAFKA_BROKERLIST));
+ kafkaProducer.setTopic(azkabanProps.getString(ServerProperties.AZKABAN_SERVER_LOGGING_KAFKA_TOPIC));
+
+ JSONObject layout = new JSONObject();
+ layout.put("category", "%c{1}");
+ layout.put("level", "%p");
+ layout.put("message", "%m");
+ layout.put("projectname", props.getString(FlowProperties.AZKABAN_FLOW_PROJECT_NAME));
+ layout.put("flowid", props.getString(FlowProperties.AZKABAN_FLOW_FLOW_ID));
+ layout.put("submituser", props.getString(FlowProperties.AZKABAN_FLOW_SUBMIT_USER));
+ layout.put("execid", props.getString(FlowProperties.AZKABAN_FLOW_EXEC_ID));
+ layout.put("projectversion", props.getString(FlowProperties.AZKABAN_FLOW_PROJECT_VERSION));
+ layout.put("logsource", "userJob");
+
+ kafkaProducer.setLayout(new EnhancedPatternLayout(layout.toString()));
+ kafkaProducer.activateOptions();
+
+ flowLogger.info("Created kafka appender for " + this.jobId);
+ return kafkaProducer;
+ }
+
+ private void removeAppender(Optional<Appender> appender) {
+ if (appender.isPresent()) {
+ removeAppender(appender.get());
+ }
+ }
+
+ private void removeAppender(Appender appender) {
+ if (appender != null) {
+ logger.removeAppender(appender);
+ appender.close();
+ }
+ }
+
private void closeLogger() {
if (jobAppender != null) {
- logger.removeAppender(jobAppender);
- jobAppender.close();
+ removeAppender(jobAppender);
+ }
+ if (kafkaAppender.isPresent()) {
+ removeAppender(kafkaAppender);
}
}
@@ -556,8 +647,7 @@ public class JobRunner extends EventHandler implements Runnable {
* know what executions initiated their execution.
*/
private void insertJobMetadata() {
- Props azkProps = AzkabanExecutorServer.getApp().getAzkabanProps();
- String baseURL = azkProps.get(AZKABAN_WEBSERVER_URL);
+ String baseURL = azkabanProps.get(AZKABAN_WEBSERVER_URL);
if (baseURL != null) {
String flowName = node.getParentFlow().getFlowId();
String projectName = node.getParentFlow().getProjectName();
@@ -754,4 +844,4 @@ public class JobRunner extends EventHandler implements Runnable {
return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId
+ ".attach" : "_job." + executionId + "." + jobId + ".attach";
}
-}
\ No newline at end of file
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
index a75f9b6..0570435 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
@@ -43,6 +43,7 @@ import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import azkaban.project.MockProjectLoader;
import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
public class LocalFlowWatcherTest {
private File workingDir;
@@ -237,6 +238,12 @@ public class LocalFlowWatcherTest {
private FlowRunner createFlowRunner(File workingDir, ExecutorLoader loader,
EventCollectorListener eventCollector, String flowName, int execId,
FlowWatcher watcher, Integer pipeline) throws Exception {
+ return createFlowRunner(workingDir, loader, eventCollector, flowName, execId, watcher, pipeline, new Props());
+ }
+
+ private FlowRunner createFlowRunner(File workingDir, ExecutorLoader loader,
+ EventCollectorListener eventCollector, String flowName, int execId,
+ FlowWatcher watcher, Integer pipeline, Props azkabanProps) throws Exception {
File testDir = new File("unit/executions/exectest1");
ExecutableFlow exFlow =
prepareExecDir(workingDir, testDir, flowName, execId);
@@ -250,7 +257,7 @@ public class LocalFlowWatcherTest {
loader.uploadExecutableFlow(exFlow);
FlowRunner runner =
- new FlowRunner(exFlow, loader, fakeProjectLoader, jobtypeManager);
+ new FlowRunner(exFlow, loader, fakeProjectLoader, jobtypeManager, azkabanProps);
runner.setFlowWatcher(watcher);
runner.addListener(eventCollector);
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
index 4cbc794..2b7197c 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
@@ -44,6 +44,7 @@ import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import azkaban.project.MockProjectLoader;
import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
public class RemoteFlowWatcherTest {
private File workingDir;
@@ -238,6 +239,12 @@ public class RemoteFlowWatcherTest {
private FlowRunner createFlowRunner(File workingDir, ExecutorLoader loader,
EventCollectorListener eventCollector, String flowName, int execId,
FlowWatcher watcher, Integer pipeline) throws Exception {
+ return createFlowRunner(workingDir, loader, eventCollector, flowName, execId, watcher, pipeline, new Props());
+ }
+
+ private FlowRunner createFlowRunner(File workingDir, ExecutorLoader loader,
+ EventCollectorListener eventCollector, String flowName, int execId,
+ FlowWatcher watcher, Integer pipeline, Props azkabanProps) throws Exception {
File testDir = new File("unit/executions/exectest1");
ExecutableFlow exFlow =
prepareExecDir(workingDir, testDir, flowName, execId);
@@ -251,7 +258,7 @@ public class RemoteFlowWatcherTest {
loader.uploadExecutableFlow(exFlow);
FlowRunner runner =
- new FlowRunner(exFlow, loader, fakeProjectLoader, jobtypeManager);
+ new FlowRunner(exFlow, loader, fakeProjectLoader, jobtypeManager, azkabanProps);
runner.setFlowWatcher(watcher);
runner.addListener(eventCollector);
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
index e29e973..c53cf99 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
@@ -677,11 +677,17 @@ public class FlowRunnerPipelineTest {
private FlowRunner createFlowRunner(EventCollectorListener eventCollector,
String flowName, String groupName) throws Exception {
return createFlowRunner(eventCollector, flowName, groupName,
- new ExecutionOptions());
+ new ExecutionOptions(), new Props());
}
private FlowRunner createFlowRunner(EventCollectorListener eventCollector,
- String flowName, String groupName, ExecutionOptions options)
+ String flowName, String groupName, ExecutionOptions options) throws Exception {
+ return createFlowRunner(eventCollector, flowName, groupName,
+ options, new Props());
+ }
+
+ private FlowRunner createFlowRunner(EventCollectorListener eventCollector,
+ String flowName, String groupName, ExecutionOptions options, Props azkabanProps)
throws Exception {
Flow flow = flowMap.get(flowName);
@@ -698,7 +704,7 @@ public class FlowRunnerPipelineTest {
FlowRunner runner =
new FlowRunner(fakeExecutorLoader.fetchExecutableFlow(exId),
- fakeExecutorLoader, fakeProjectLoader, jobtypeManager);
+ fakeExecutorLoader, fakeProjectLoader, jobtypeManager, azkabanProps);
runner.addListener(eventCollector);
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
index 422f622..f79f2fe 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
@@ -226,6 +226,11 @@ public class FlowRunnerPropertyResolutionTest {
private FlowRunner createFlowRunner(String flowName,
HashMap<String, String> flowParams) throws Exception {
+ return createFlowRunner(flowName, flowParams, new Props());
+ }
+
+ private FlowRunner createFlowRunner(String flowName,
+ HashMap<String, String> flowParams, Props azkabanProps) throws Exception {
Flow flow = flowMap.get(flowName);
int exId = id++;
@@ -238,7 +243,7 @@ public class FlowRunnerPropertyResolutionTest {
FlowRunner runner =
new FlowRunner(fakeExecutorLoader.fetchExecutableFlow(exId),
- fakeExecutorLoader, fakeProjectLoader, jobtypeManager);
+ fakeExecutorLoader, fakeProjectLoader, jobtypeManager, azkabanProps);
return runner;
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
index 8b2c6d4..c4289e4 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
@@ -45,6 +45,7 @@ import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import azkaban.project.MockProjectLoader;
import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
public class FlowRunnerTest {
private File workingDir;
@@ -455,7 +456,12 @@ public class FlowRunnerTest {
}
private FlowRunner createFlowRunner(ExecutableFlow flow,
- ExecutorLoader loader, EventCollectorListener eventCollector)
+ ExecutorLoader loader, EventCollectorListener eventCollector) throws Exception {
+ return createFlowRunner(flow, loader, eventCollector, new Props());
+ }
+
+ private FlowRunner createFlowRunner(ExecutableFlow flow,
+ ExecutorLoader loader, EventCollectorListener eventCollector, Props azkabanProps)
throws Exception {
// File testDir = new File("unit/executions/exectest1");
// MockProjectLoader projectLoader = new MockProjectLoader(new
@@ -463,7 +469,7 @@ public class FlowRunnerTest {
loader.uploadExecutableFlow(flow);
FlowRunner runner =
- new FlowRunner(flow, loader, fakeProjectLoader, jobtypeManager);
+ new FlowRunner(flow, loader, fakeProjectLoader, jobtypeManager, azkabanProps);
runner.addListener(eventCollector);
@@ -472,6 +478,11 @@ public class FlowRunnerTest {
private FlowRunner createFlowRunner(ExecutorLoader loader,
EventCollectorListener eventCollector, String flowName) throws Exception {
+ return createFlowRunner(loader, eventCollector, flowName, new Props());
+ }
+
+ private FlowRunner createFlowRunner(ExecutorLoader loader,
+ EventCollectorListener eventCollector, String flowName, Props azkabanProps) throws Exception {
File testDir = new File("unit/executions/exectest1");
ExecutableFlow exFlow = prepareExecDir(testDir, flowName, 1);
// MockProjectLoader projectLoader = new MockProjectLoader(new
@@ -480,7 +491,7 @@ public class FlowRunnerTest {
loader.uploadExecutableFlow(exFlow);
FlowRunner runner =
- new FlowRunner(exFlow, loader, fakeProjectLoader, jobtypeManager);
+ new FlowRunner(exFlow, loader, fakeProjectLoader, jobtypeManager, azkabanProps);
runner.addListener(eventCollector);
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
index 84416de..196fcd7 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
@@ -1403,6 +1403,11 @@ public class FlowRunnerTest2 {
private FlowRunner createFlowRunner(EventCollectorListener eventCollector,
String flowName, FailureAction action) throws Exception {
+ return createFlowRunner(eventCollector, flowName, action, new Props());
+ }
+
+ private FlowRunner createFlowRunner(EventCollectorListener eventCollector,
+ String flowName, FailureAction action, Props azkabanProps) throws Exception {
Flow flow = flowMap.get(flowName);
int exId = id++;
@@ -1420,7 +1425,7 @@ public class FlowRunnerTest2 {
FlowRunner runner = new FlowRunner(
fakeExecutorLoader.fetchExecutableFlow(exId), fakeExecutorLoader,
- fakeProjectLoader, jobtypeManager);
+ fakeProjectLoader, jobtypeManager, azkabanProps);
runner.addListener(eventCollector);
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index 95406f5..479441a 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -381,6 +381,11 @@ public class JobRunnerTest {
private JobRunner createJobRunner(int execId, String name, int time,
boolean fail, ExecutorLoader loader, EventCollectorListener listener) {
+ return createJobRunner(execId, name, time, fail, loader, listener, new Props());
+ }
+
+ private JobRunner createJobRunner(int execId, String name, int time,
+ boolean fail, ExecutorLoader loader, EventCollectorListener listener, Props azkabanProps) {
ExecutableFlow flow = new ExecutableFlow();
flow.setExecutionId(execId);
ExecutableNode node = new ExecutableNode();
@@ -391,7 +396,7 @@ public class JobRunnerTest {
node.setInputProps(props);
HashSet<String> proxyUsers = new HashSet<String>();
proxyUsers.add(flow.getSubmitUser());
- JobRunner runner = new JobRunner(node, workingDir, loader, jobtypeManager);
+ JobRunner runner = new JobRunner(node, workingDir, loader, jobtypeManager, azkabanProps);
runner.setLogSettings(logger, "5MB", 4);
runner.addListener(listener);
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 243512c..e490a7d 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -55,6 +55,7 @@ import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.thread.QueuedThreadPool;
import azkaban.alert.Alerter;
+import azkaban.constants.ServerInternals;
import azkaban.database.AzkabanDatabaseSetup;
import azkaban.executor.ExecutorManager;
import azkaban.executor.JdbcExecutorLoader;
@@ -67,7 +68,6 @@ import azkaban.scheduler.ScheduleLoader;
import azkaban.scheduler.ScheduleManager;
import azkaban.scheduler.TriggerBasedScheduleLoader;
import azkaban.server.AzkabanServer;
-import azkaban.server.Constants;
import azkaban.server.session.SessionCache;
import azkaban.trigger.JdbcTriggerLoader;
import azkaban.trigger.TriggerLoader;
@@ -804,7 +804,7 @@ public class AzkabanWebServer extends AzkabanServer {
// TODO: find something else to do the job
app.getTriggerManager().start();
- root.setAttribute(Constants.AZKABAN_SERVLET_CONTEXT_KEY, app);
+ root.setAttribute(ServerInternals.AZKABAN_SERVLET_CONTEXT_KEY, app);
try {
server.start();
} catch (Exception e) {