azkaban-developers
Changes
azkaban-common/src/main/java/azkaban/Constants.java 106(+73 -33)
Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index a018542..88e5285 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -16,8 +16,8 @@
package azkaban.executor;
+import azkaban.Constants;
import azkaban.metrics.CommonMetrics;
-import azkaban.constants.ServerProperties;
import azkaban.utils.FlowUtils;
import com.google.common.collect.Lists;
import java.io.File;
@@ -200,7 +200,7 @@ public class ExecutorManager extends EventHandler implements
newExecutors.addAll(executorLoader.fetchActiveExecutors());
} else if (azkProps.containsKey("executor.port")) {
// Add local executor, if specified as per properties
- String executorHost = azkProps.getString(ServerProperties.EXECUTOR_HOST, "localhost");
+ String executorHost = azkProps.getString(Constants.ConfigurationKeys.EXECUTOR_HOST, "localhost");
int executorPort = azkProps.getInt("executor.port");
logger.info(String.format("Initializing local executor %s:%d",
executorHost, executorPort));
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 4029d46..221e5f3 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -16,7 +16,7 @@
package azkaban.jobExecutor;
-import azkaban.constants.ServerInternals;
+import azkaban.Constants;
import azkaban.metrics.CommonMetrics;
import java.io.File;
import java.util.ArrayList;
@@ -81,7 +81,7 @@ public class ProcessJob extends AbstractProcessJob {
memPair.getFirst(), memPair.getSecond(), getId());
int attempt;
boolean isMemGranted = true;
- for(attempt = 1; attempt <= ServerInternals.MEMORY_CHECK_RETRY_LIMIT; attempt++) {
+ for(attempt = 1; attempt <= Constants.MEMORY_CHECK_RETRY_LIMIT; attempt++) {
isMemGranted = SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(), memPair.getSecond(), freeMemDecrAmt);
if (isMemGranted) {
info(String.format("Memory granted (Xms %d kb, Xmx %d kb) from system for job %s", memPair.getFirst(), memPair.getSecond(), getId()));
@@ -90,14 +90,15 @@ public class ProcessJob extends AbstractProcessJob {
}
break;
}
- if (attempt < ServerInternals.MEMORY_CHECK_RETRY_LIMIT) {
- info(String.format(oomMsg + ", sleep for %s secs and retry, attempt %s of %s", TimeUnit.MILLISECONDS.toSeconds(ServerInternals.MEMORY_CHECK_INTERVAL_MS), attempt, ServerInternals.MEMORY_CHECK_RETRY_LIMIT));
+ if (attempt < Constants.MEMORY_CHECK_RETRY_LIMIT) {
+ info(String.format(oomMsg + ", sleep for %s secs and retry, attempt %s of %s", TimeUnit.MILLISECONDS.toSeconds(
+ Constants.MEMORY_CHECK_INTERVAL_MS), attempt, Constants.MEMORY_CHECK_RETRY_LIMIT));
if (attempt == 1) {
CommonMetrics.INSTANCE.incrementOOMJobWaitCount();
}
synchronized (this) {
try {
- this.wait(ServerInternals.MEMORY_CHECK_INTERVAL_MS);
+ this.wait(Constants.MEMORY_CHECK_INTERVAL_MS);
} catch (InterruptedException e) {
info(String.format("Job %s interrupted while waiting for memory check retry", getId()));
}
diff --git a/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java b/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java
index 447c613..d7b1950 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java
@@ -17,8 +17,8 @@
package azkaban.metrics;
import azkaban.utils.Props;
-import static azkaban.constants.ServerProperties.METRICS_SERVER_URL;
-import static azkaban.constants.ServerProperties.CUSTOM_METRICS_REPORTER_CLASS_NAME;
+import static azkaban.Constants.ConfigurationKeys.METRICS_SERVER_URL;
+import static azkaban.Constants.ConfigurationKeys.CUSTOM_METRICS_REPORTER_CLASS_NAME;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ConsoleReporter;
diff --git a/azkaban-common/src/main/java/azkaban/server/AbstractServiceServlet.java b/azkaban-common/src/main/java/azkaban/server/AbstractServiceServlet.java
index c433cce..63af64a 100644
--- a/azkaban-common/src/main/java/azkaban/server/AbstractServiceServlet.java
+++ b/azkaban-common/src/main/java/azkaban/server/AbstractServiceServlet.java
@@ -16,7 +16,7 @@
package azkaban.server;
-import azkaban.constants.ServerInternals;
+import azkaban.Constants;
import java.io.IOException;
import java.io.OutputStream;
@@ -39,7 +39,7 @@ public class AbstractServiceServlet extends HttpServlet {
@Override
public void init(ServletConfig config) throws ServletException {
application =
- (AzkabanServer) config.getServletContext().getAttribute(ServerInternals.AZKABAN_SERVLET_CONTEXT_KEY);
+ (AzkabanServer) config.getServletContext().getAttribute(Constants.AZKABAN_SERVLET_CONTEXT_KEY);
if (application == null) {
throw new IllegalStateException(
diff --git a/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java b/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
index 5f726c0..c0621a0 100644
--- a/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
+++ b/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
@@ -28,7 +28,7 @@ import joptsimple.OptionSpec;
import org.apache.log4j.Logger;
import org.apache.velocity.app.VelocityEngine;
-import azkaban.constants.ServerInternals;
+import azkaban.Constants;
import azkaban.user.UserManager;
import azkaban.utils.Props;
import azkaban.server.session.SessionCache;
@@ -80,8 +80,8 @@ public abstract class AzkabanServer {
private static Props loadAzkabanConfigurationFromDirectory(File dir) {
File azkabanPrivatePropsFile =
- new File(dir, ServerInternals.AZKABAN_PRIVATE_PROPERTIES_FILE);
- File azkabanPropsFile = new File(dir, ServerInternals.AZKABAN_PROPERTIES_FILE);
+ new File(dir, Constants.AZKABAN_PRIVATE_PROPERTIES_FILE);
+ File azkabanPropsFile = new File(dir, Constants.AZKABAN_PROPERTIES_FILE);
Props props = null;
try {
@@ -125,7 +125,7 @@ public abstract class AzkabanServer {
return null;
}
- File confPath = new File(azkabanHome, ServerInternals.DEFAULT_CONF_PATH);
+ File confPath = new File(azkabanHome, Constants.DEFAULT_CONF_PATH);
if (!confPath.exists() || !confPath.isDirectory() || !confPath.canRead()) {
logger
.error(azkabanHome + " does not contain a readable conf directory.");
diff --git a/azkaban-common/src/main/java/azkaban/utils/ExternalLinkUtils.java b/azkaban-common/src/main/java/azkaban/utils/ExternalLinkUtils.java
index 53ec8fc..4d9db49 100644
--- a/azkaban-common/src/main/java/azkaban/utils/ExternalLinkUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/ExternalLinkUtils.java
@@ -16,8 +16,7 @@
package azkaban.utils;
-import azkaban.constants.FlowProperties;
-import azkaban.constants.ServerProperties;
+import azkaban.Constants;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
@@ -31,21 +30,21 @@ public class ExternalLinkUtils {
public static String getExternalAnalyzerOnReq(Props azkProps, HttpServletRequest req) {
// If no topic was configured to be an external analyzer, return empty
- if (!azkProps.containsKey(ServerProperties.AZKABAN_SERVER_EXTERNAL_ANALYZER_TOPIC)) {
+ if (!azkProps.containsKey(Constants.ConfigurationKeys.AZKABAN_SERVER_EXTERNAL_ANALYZER_TOPIC)) {
return "";
}
// Find out which external link we should use to lead to our analyzer
- String topic = azkProps.getString(ServerProperties.AZKABAN_SERVER_EXTERNAL_ANALYZER_TOPIC);
+ String topic = azkProps.getString(Constants.ConfigurationKeys.AZKABAN_SERVER_EXTERNAL_ANALYZER_TOPIC);
return getLinkFromRequest(topic, azkProps, req);
}
public static String getExternalLogViewer(Props azkProps, String jobId, Props jobProps) {
// If no topic was configured to be an external analyzer, return empty
- if (!azkProps.containsKey(ServerProperties.AZKABAN_SERVER_EXTERNAL_LOGVIEWER_TOPIC)) {
+ if (!azkProps.containsKey(Constants.ConfigurationKeys.AZKABAN_SERVER_EXTERNAL_LOGVIEWER_TOPIC)) {
return "";
}
// Find out which external link we should use to lead to our log viewer
- String topic = azkProps.getString(ServerProperties.AZKABAN_SERVER_EXTERNAL_LOGVIEWER_TOPIC);
+ String topic = azkProps.getString(Constants.ConfigurationKeys.AZKABAN_SERVER_EXTERNAL_LOGVIEWER_TOPIC);
return getLinkFromJobAndExecId(topic, azkProps, jobId, jobProps);
}
@@ -56,7 +55,7 @@ public class ExternalLinkUtils {
return "";
}
String job = encodeToUTF8(jobId);
- String execid = encodeToUTF8(jobProps.getString(FlowProperties.AZKABAN_FLOW_EXEC_ID));
+ String execid = encodeToUTF8(jobProps.getString(Constants.FlowProperties.AZKABAN_FLOW_EXEC_ID));
urlTemplate = urlTemplate.replace("${jobid}", job).replace("${execid}", execid);
logger.info("Creating link: " + urlTemplate);
@@ -81,7 +80,7 @@ public class ExternalLinkUtils {
}
static String getURLForTopic(String topic, Props azkProps) {
- return azkProps.getString(ServerProperties.AZKABAN_SERVER_EXTERNAL_TOPIC_URL.replace("${topic}", topic), "");
+ return azkProps.getString(Constants.ConfigurationKeys.AZKABAN_SERVER_EXTERNAL_TOPIC_URL.replace("${topic}", topic), "");
}
static String encodeToUTF8(String url) {
diff --git a/azkaban-common/src/test/java/azkaban/utils/ExternalLinkUtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/ExternalLinkUtilsTest.java
index a0485af..957759b 100644
--- a/azkaban-common/src/test/java/azkaban/utils/ExternalLinkUtilsTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/ExternalLinkUtilsTest.java
@@ -20,8 +20,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import azkaban.constants.FlowProperties;
-import azkaban.constants.ServerProperties;
+import azkaban.Constants;
import javax.servlet.http.HttpServletRequest;
@@ -65,7 +64,7 @@ public class ExternalLinkUtilsTest {
// Job configuration consisting of only an exec id and job id
jobProps = new Props();
- jobProps.put(FlowProperties.AZKABAN_FLOW_EXEC_ID, 1);
+ jobProps.put(Constants.FlowProperties.AZKABAN_FLOW_EXEC_ID, 1);
jobId = "Some + job";
mockRequest = mock(HttpServletRequest.class);
@@ -77,8 +76,9 @@ public class ExternalLinkUtilsTest {
*/
@Test
public void testGetExternalAnalyzerValidFormat() {
- azkProps.put(ServerProperties.AZKABAN_SERVER_EXTERNAL_ANALYZER_TOPIC, EXTERNAL_ANALYZER_TOPIC);
- azkProps.put(ServerProperties.AZKABAN_SERVER_EXTERNAL_TOPIC_URL.replace("${topic}", EXTERNAL_ANALYZER_TOPIC),
+ azkProps.put(Constants.ConfigurationKeys.AZKABAN_SERVER_EXTERNAL_ANALYZER_TOPIC, EXTERNAL_ANALYZER_TOPIC);
+ azkProps.put(
+ Constants.ConfigurationKeys.AZKABAN_SERVER_EXTERNAL_TOPIC_URL.replace("${topic}", EXTERNAL_ANALYZER_TOPIC),
EXTERNAL_ANALYZER_URL_VALID_FORMAT);
when(mockRequest.getRequestURL()).thenReturn(new StringBuffer(EXEC_URL));
@@ -95,8 +95,9 @@ public class ExternalLinkUtilsTest {
*/
@Test
public void testGetExternalLogViewerValidFormat() {
- azkProps.put(ServerProperties.AZKABAN_SERVER_EXTERNAL_LOGVIEWER_TOPIC, EXTERNAL_LOGVIEWER_TOPIC);
- azkProps.put(ServerProperties.AZKABAN_SERVER_EXTERNAL_TOPIC_URL.replace("${topic}", EXTERNAL_LOGVIEWER_TOPIC),
+ azkProps.put(Constants.ConfigurationKeys.AZKABAN_SERVER_EXTERNAL_LOGVIEWER_TOPIC, EXTERNAL_LOGVIEWER_TOPIC);
+ azkProps.put(
+ Constants.ConfigurationKeys.AZKABAN_SERVER_EXTERNAL_TOPIC_URL.replace("${topic}", EXTERNAL_LOGVIEWER_TOPIC),
EXTERNAL_LOGVIEWER_URL_VALID_FORMAT);
String externalURL =
@@ -144,7 +145,7 @@ public class ExternalLinkUtilsTest {
*/
@Test
public void testFetchURL() {
- azkProps.put(ServerProperties.AZKABAN_SERVER_EXTERNAL_TOPIC_URL.replace("${topic}", "someTopic"), "This is a link");
+ azkProps.put(Constants.ConfigurationKeys.AZKABAN_SERVER_EXTERNAL_TOPIC_URL.replace("${topic}", "someTopic"), "This is a link");
assertTrue(ExternalLinkUtils.getURLForTopic("someTopic", azkProps).equals("This is a link"));
}
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 828c615..c1bdf14 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -45,8 +45,7 @@ import javax.management.MBeanInfo;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import azkaban.constants.ServerInternals;
-import azkaban.constants.ServerProperties;
+import azkaban.Constants;
import azkaban.execapp.event.JobCallbackManager;
import azkaban.execapp.jmx.JmxFlowRunnerManager;
@@ -74,7 +73,7 @@ import azkaban.utils.SystemMemoryInfo;
import azkaban.utils.Utils;
import azkaban.metrics.MetricsManager;
-import static azkaban.constants.ServerInternals.AZKABAN_EXECUTOR_PORT_FILENAME;
+import static azkaban.Constants.AZKABAN_EXECUTOR_PORT_FILENAME;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
@@ -138,7 +137,7 @@ public class AzkabanExecutorServer {
logger.info("Started Executor Server on " + getExecutorHostPort());
- if (props.getBoolean(ServerProperties.IS_METRICS_ENABLED, false)) {
+ if (props.getBoolean(Constants.ConfigurationKeys.IS_METRICS_ENABLED, false)) {
startExecMetrics();
}
}
@@ -178,7 +177,7 @@ public class AzkabanExecutorServer {
root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverStatistics");
- root.setAttribute(ServerInternals.AZKABAN_SERVLET_CONTEXT_KEY, this);
+ root.setAttribute(Constants.AZKABAN_SERVLET_CONTEXT_KEY, this);
return server;
}
@@ -439,7 +438,7 @@ public class AzkabanExecutorServer {
return null;
}
- File confPath = new File(azkabanHome, ServerInternals.DEFAULT_CONF_PATH);
+ File confPath = new File(azkabanHome, Constants.DEFAULT_CONF_PATH);
if (!confPath.exists() || !confPath.isDirectory() || !confPath.canRead()) {
logger
.error(azkabanHome + " does not contain a readable conf directory.");
@@ -460,8 +459,8 @@ public class AzkabanExecutorServer {
*/
private static Props loadAzkabanConfigurationFromDirectory(File dir) {
File azkabanPrivatePropsFile =
- new File(dir, ServerInternals.AZKABAN_PRIVATE_PROPERTIES_FILE);
- File azkabanPropsFile = new File(dir, ServerInternals.AZKABAN_PROPERTIES_FILE);
+ new File(dir, Constants.AZKABAN_PRIVATE_PROPERTIES_FILE);
+ File azkabanPropsFile = new File(dir, Constants.AZKABAN_PROPERTIES_FILE);
Props props = null;
try {
@@ -556,8 +555,8 @@ public class AzkabanExecutorServer {
* @return hostname
*/
public String getHost() {
- if(props.containsKey(ServerProperties.AZKABAN_SERVER_HOST_NAME)) {
- String hostName = props.getString(ServerProperties.AZKABAN_SERVER_HOST_NAME);
+ if(props.containsKey(Constants.ConfigurationKeys.AZKABAN_SERVER_HOST_NAME)) {
+ String hostName = props.getString(Constants.ConfigurationKeys.AZKABAN_SERVER_HOST_NAME);
if(!StringUtils.isEmpty(hostName)) {
return hostName;
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
index 441056c..2e6bdb0 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
@@ -35,7 +35,7 @@ import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
-import azkaban.constants.ServerInternals;
+import azkaban.Constants;
import azkaban.executor.ConnectorParams;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.Executor;
@@ -65,7 +65,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
public void init(ServletConfig config) throws ServletException {
application =
(AzkabanExecutorServer) config.getServletContext().getAttribute(
- ServerInternals.AZKABAN_SERVLET_CONTEXT_KEY);
+ Constants.AZKABAN_SERVLET_CONTEXT_KEY);
if (application == null) {
throw new IllegalStateException(
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 3d5c257..a9932fa 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -16,7 +16,7 @@
package azkaban.execapp;
-import azkaban.constants.ServerProperties;
+import azkaban.Constants;
import azkaban.executor.Status;
import java.io.File;
import java.io.FilenameFilter;
@@ -286,7 +286,7 @@ public class FlowRunnerManager implements EventListener,
private long lastOldProjectCleanTime = -1;
private long lastRecentlyFinishedCleanTime = -1;
private long lastLongRunningFlowCleanTime = -1;
- private final long flowMaxRunningTimeInMins = azkabanProps.getInt(ServerProperties.AZKABAN_MAX_FLOW_RUNNING_MINS, 60 * 24 * 10);
+ private final long flowMaxRunningTimeInMins = azkabanProps.getInt(Constants.ConfigurationKeys.AZKABAN_MAX_FLOW_RUNNING_MINS, 60 * 24 * 10);
public CleanerThread() {
this.setName("FlowRunnerManager-Cleaner-Thread");
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JMXHttpServlet.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JMXHttpServlet.java
index 2141507..4abdc95 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JMXHttpServlet.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JMXHttpServlet.java
@@ -32,7 +32,7 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.log4j.Logger;
-import azkaban.constants.ServerInternals;
+import azkaban.Constants;
import azkaban.executor.ConnectorParams;
import azkaban.server.HttpRequestUtils;
import azkaban.utils.JSONUtils;
@@ -45,7 +45,7 @@ public class JMXHttpServlet extends HttpServlet implements ConnectorParams {
public void init(ServletConfig config) throws ServletException {
server =
(AzkabanExecutorServer) config.getServletContext().getAttribute(
- ServerInternals.AZKABAN_SERVLET_CONTEXT_KEY);
+ Constants.AZKABAN_SERVLET_CONTEXT_KEY);
}
public boolean hasParam(HttpServletRequest request, String param) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index 0c17d1b..2445920 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -16,6 +16,7 @@
package azkaban.execapp;
+import azkaban.Constants;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -37,9 +38,6 @@ import org.apache.kafka.log4jappender.KafkaLog4jAppender;
import org.json.simple.JSONObject;
-import azkaban.constants.FlowProperties;
-import azkaban.constants.JobProperties;
-import azkaban.constants.ServerProperties;
import azkaban.event.Event;
import azkaban.event.Event.Type;
import azkaban.event.EventData;
@@ -233,10 +231,10 @@ public class JobRunner extends EventHandler implements Runnable {
+ " for job " + this.jobId, e);
}
- if (props.getBoolean(JobProperties.AZKABAN_JOB_LOGGING_KAFKA_ENABLE, false)) {
+ if (props.getBoolean(Constants.JobProperties.AZKABAN_JOB_LOGGING_KAFKA_ENABLE, false)) {
// Only attempt appender construction if required properties are present
- if (azkabanProps.containsKey(ServerProperties.AZKABAN_SERVER_LOGGING_KAFKA_BROKERLIST)
- && azkabanProps.containsKey(ServerProperties.AZKABAN_SERVER_LOGGING_KAFKA_TOPIC)) {
+ if (azkabanProps.containsKey(Constants.ConfigurationKeys.AZKABAN_SERVER_LOGGING_KAFKA_BROKERLIST)
+ && azkabanProps.containsKey(Constants.ConfigurationKeys.AZKABAN_SERVER_LOGGING_KAFKA_TOPIC)) {
try {
attachKafkaAppender(createKafkaAppender());
} catch (Exception e) {
@@ -300,19 +298,19 @@ public class JobRunner extends EventHandler implements Runnable {
private KafkaLog4jAppender createKafkaAppender() throws UndefinedPropertyException {
KafkaLog4jAppender kafkaProducer = new KafkaLog4jAppender();
kafkaProducer.setSyncSend(false);
- kafkaProducer.setBrokerList(azkabanProps.getString(ServerProperties.AZKABAN_SERVER_LOGGING_KAFKA_BROKERLIST));
- kafkaProducer.setTopic(azkabanProps.getString(ServerProperties.AZKABAN_SERVER_LOGGING_KAFKA_TOPIC));
+ kafkaProducer.setBrokerList(azkabanProps.getString(Constants.ConfigurationKeys.AZKABAN_SERVER_LOGGING_KAFKA_BROKERLIST));
+ kafkaProducer.setTopic(azkabanProps.getString(Constants.ConfigurationKeys.AZKABAN_SERVER_LOGGING_KAFKA_TOPIC));
JSONObject layout = new JSONObject();
layout.put("category", "%c{1}");
layout.put("level", "%p");
layout.put("message", "%m");
- layout.put("projectname", props.getString(FlowProperties.AZKABAN_FLOW_PROJECT_NAME));
- layout.put("flowid", props.getString(FlowProperties.AZKABAN_FLOW_FLOW_ID));
+ layout.put("projectname", props.getString(Constants.FlowProperties.AZKABAN_FLOW_PROJECT_NAME));
+ layout.put("flowid", props.getString(Constants.FlowProperties.AZKABAN_FLOW_FLOW_ID));
layout.put("jobid", this.jobId);
- layout.put("submituser", props.getString(FlowProperties.AZKABAN_FLOW_SUBMIT_USER));
- layout.put("execid", props.getString(FlowProperties.AZKABAN_FLOW_EXEC_ID));
- layout.put("projectversion", props.getString(FlowProperties.AZKABAN_FLOW_PROJECT_VERSION));
+ layout.put("submituser", props.getString(Constants.FlowProperties.AZKABAN_FLOW_SUBMIT_USER));
+ layout.put("execid", props.getString(Constants.FlowProperties.AZKABAN_FLOW_EXEC_ID));
+ layout.put("projectversion", props.getString(Constants.FlowProperties.AZKABAN_FLOW_PROJECT_VERSION));
layout.put("logsource", "userJob");
kafkaProducer.setLayout(new PatternLayoutEscaped(layout.toString()));
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index dda823c..8a9e4fb 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -34,7 +34,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
-import java.util.function.Supplier;
import javax.management.MBeanInfo;
import javax.management.MBeanServer;
@@ -58,8 +57,7 @@ import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.thread.QueuedThreadPool;
import azkaban.alert.Alerter;
-import azkaban.constants.ServerInternals;
-import azkaban.constants.ServerProperties;
+import azkaban.Constants;
import azkaban.database.AzkabanDatabaseSetup;
import azkaban.executor.ExecutorManager;
import azkaban.executor.JdbcExecutorLoader;
@@ -847,10 +845,10 @@ public class AzkabanWebServer extends AzkabanServer {
// TODO: find something else to do the job
app.getTriggerManager().start();
- root.setAttribute(ServerInternals.AZKABAN_SERVLET_CONTEXT_KEY, app);
+ root.setAttribute(Constants.AZKABAN_SERVLET_CONTEXT_KEY, app);
- if (azkabanSettings.getBoolean(ServerProperties.IS_METRICS_ENABLED, false)) {
+ if (azkabanSettings.getBoolean(Constants.ConfigurationKeys.IS_METRICS_ENABLED, false)) {
app.startWebMetrics();
}
try {
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 225df57..30b2c4f 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -16,7 +16,7 @@
package azkaban.webapp.servlet;
-import azkaban.constants.ServerProperties;
+import azkaban.Constants;
import azkaban.executor.ConnectorParams;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
@@ -391,7 +391,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
LOGGER.debug("External analyzer url: " + execExternalLinkURL);
String execExternalLinkLabel =
- props.getString(ServerProperties.AZKABAN_SERVER_EXTERNAL_ANALYZER_LABEL,
+ props.getString(Constants.ConfigurationKeys.AZKABAN_SERVER_EXTERNAL_ANALYZER_LABEL,
"External Analyzer");
page.add("executionExternalLinkLabel", execExternalLinkLabel);
LOGGER.debug("External analyzer label set to : " + execExternalLinkLabel);