azkaban-aplcache

Move ExecutorManager props to Constants (#1660) As requested

3/9/2018 8:22:31 PM

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index d944eed..847d7d3 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -195,6 +195,21 @@ public class Constants {
 
     // dir to keep dependency plugins
     public static final String DEPENDENCY_PLUGIN_DIR = "azkaban.dependency.plugin.dir";
+
+    public static final String USE_MULTIPLE_EXECUTORS = "azkaban.use.multiple.executors";
+    public static final String MAX_CONCURRENT_RUNS_ONEFLOW = "azkaban.max.concurrent.runs.oneflow";
+    public static final String WEBSERVER_QUEUE_SIZE = "azkaban.webserver.queue.size";
+    public static final String ACTIVE_EXECUTOR_REFRESH_IN_MS =
+        "azkaban.activeexecutor.refresh.milisecinterval";
+    public static final String ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW =
+        "azkaban.activeexecutor.refresh.flowinterval";
+    public static final String EXECUTORINFO_REFRESH_MAX_THREADS =
+        "azkaban.executorinfo.refresh.maxThreads";
+    public static final String MAX_DISPATCHING_ERRORS_PERMITTED = "azkaban.maxDispatchingErrors";
+    public static final String EXECUTOR_SELECTOR_FILTERS = "azkaban.executorselector.filters";
+    public static final String EXECUTOR_SELECTOR_COMPARATOR_PREFIX =
+        "azkaban.executorselector.comparator.";
+    public static final String QUEUEPROCESSING_ENABLED = "azkaban.queueprocessing.enabled";
   }
 
   public static class FlowProperties {
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 267cc9a..6cf0fa8 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -65,27 +65,7 @@ import org.joda.time.DateTime;
 public class ExecutorManager extends EventHandler implements
     ExecutorManagerAdapter {
 
-  public static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
-      "azkaban.use.multiple.executors";
-  public static final String AZKABAN_MAX_CONCURRENT_RUNS_ONEFLOW =
-      "azkaban.max.concurrent.runs.oneflow";
-  static final String AZKABAN_EXECUTOR_SELECTOR_FILTERS =
-      "azkaban.executorselector.filters";
-  static final String AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX =
-      "azkaban.executorselector.comparator.";
-  static final String AZKABAN_QUEUEPROCESSING_ENABLED =
-      "azkaban.queueprocessing.enabled";
-  private static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
-      "azkaban.webserver.queue.size";
-  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS =
-      "azkaban.activeexecutor.refresh.milisecinterval";
-  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW =
-      "azkaban.activeexecutor.refresh.flowinterval";
   private static final int DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW = 30;
-  private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS =
-      "azkaban.executorinfo.refresh.maxThreads";
-  private static final String AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED =
-      "azkaban.maxDispatchingErrors";
   // 12 weeks
   private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
       * 24 * 60 * 60 * 1000L;
@@ -127,12 +107,14 @@ public class ExecutorManager extends EventHandler implements
     this.setupExecutors();
     this.loadRunningFlows();
 
-    this.queuedFlows = new QueuedExecutions(azkProps.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
+    this.queuedFlows = new QueuedExecutions(
+        azkProps.getLong(Constants.ConfigurationKeys.WEBSERVER_QUEUE_SIZE, 100000));
 
     // The default threshold is set to 30 for now, in case some users are affected. We may
     // decrease this number in future, to better prevent DDos attacks.
-    this.maxConcurrentRunsOneFlow = azkProps.getInt(AZKABAN_MAX_CONCURRENT_RUNS_ONEFLOW,
-        DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW);
+    this.maxConcurrentRunsOneFlow = azkProps
+        .getInt(Constants.ConfigurationKeys.MAX_CONCURRENT_RUNS_ONEFLOW,
+            DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW);
     this.loadQueuedFlows();
 
     this.cacheDir = new File(azkProps.getString("cache.directory", "cache"));
@@ -155,15 +137,16 @@ public class ExecutorManager extends EventHandler implements
 
   private void setupMultiExecutorMode() {
     // initliatize hard filters for executor selector from azkaban.properties
-    final String filters = this.azkProps.getString(AZKABAN_EXECUTOR_SELECTOR_FILTERS, "");
+    final String filters = this.azkProps
+        .getString(Constants.ConfigurationKeys.EXECUTOR_SELECTOR_FILTERS, "");
     if (filters != null) {
       this.filterList = Arrays.asList(StringUtils.split(filters, ","));
     }
 
     // initliatize comparator feature weights for executor selector from
     // azkaban.properties
-    final Map<String, String> compListStrings =
-        this.azkProps.getMapByPrefix(AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
+    final Map<String, String> compListStrings = this.azkProps
+        .getMapByPrefix(Constants.ConfigurationKeys.EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
     if (compListStrings != null) {
       this.comparatorWeightsMap = new TreeMap<>();
       for (final Map.Entry<String, String> entry : compListStrings.entrySet()) {
@@ -173,15 +156,18 @@ public class ExecutorManager extends EventHandler implements
 
     this.executorInforRefresherService =
         Executors.newFixedThreadPool(this.azkProps.getInt(
-            AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS, 5));
+            Constants.ConfigurationKeys.EXECUTORINFO_REFRESH_MAX_THREADS, 5));
 
     // configure queue processor
     this.queueProcessor =
-        new QueueProcessorThread(this.azkProps.getBoolean(
-            AZKABAN_QUEUEPROCESSING_ENABLED, true), this.azkProps.getLong(
-            AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000), this.azkProps.getInt(
-            AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5), this.azkProps.getInt(
-            AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED, this.activeExecutors.size()));
+        new QueueProcessorThread(
+            this.azkProps.getBoolean(Constants.ConfigurationKeys.QUEUEPROCESSING_ENABLED, true),
+            this.azkProps.getLong(Constants.ConfigurationKeys.ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000),
+            this.azkProps.getInt(
+                Constants.ConfigurationKeys.ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5),
+            this.azkProps.getInt(
+                Constants.ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED,
+                this.activeExecutors.size()));
 
     this.queueProcessor.start();
   }
@@ -232,7 +218,7 @@ public class ExecutorManager extends EventHandler implements
   }
 
   private boolean isMultiExecutorMode() {
-    return this.azkProps.getBoolean(AZKABAN_USE_MULTIPLE_EXECUTORS, false);
+    return this.azkProps.getBoolean(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, false);
   }
 
   /**
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 88d7812..3406ec9 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import azkaban.Constants;
 import azkaban.metrics.CommonMetrics;
 import azkaban.metrics.MetricsManager;
 import azkaban.user.User;
@@ -85,8 +86,8 @@ public class ExecutorManagerTest {
    * Helper method to create a ExecutorManager Instance
    */
   private ExecutorManager createMultiExecutorManagerInstance() throws Exception {
-    this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
-    this.props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "false");
+    this.props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "true");
+    this.props.put(Constants.ConfigurationKeys.QUEUEPROCESSING_ENABLED, "false");
     this.loader.addExecutor("localhost", 12345);
     this.loader.addExecutor("localhost", 12346);
     return createExecutorManager();
@@ -98,7 +99,7 @@ public class ExecutorManagerTest {
    */
   @Test(expected = ExecutorManagerException.class)
   public void testNoExecutorScenario() throws Exception {
-    this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    this.props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "true");
     @SuppressWarnings("unused") final ExecutorManager manager = createExecutorManager();
   }
 
@@ -125,7 +126,7 @@ public class ExecutorManagerTest {
    */
   @Test
   public void testMultipleExecutorScenario() throws Exception {
-    this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    this.props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "true");
     final Executor executor1 = this.loader.addExecutor("localhost", 12345);
     final Executor executor2 = this.loader.addExecutor("localhost", 12346);
 
@@ -147,7 +148,7 @@ public class ExecutorManagerTest {
    */
   @Test
   public void testSetupExecutorsSucess() throws Exception {
-    this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    this.props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "true");
     final Executor executor1 = this.loader.addExecutor("localhost", 12345);
     final ExecutorManager manager = createExecutorManager();
     Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
@@ -170,7 +171,7 @@ public class ExecutorManagerTest {
    */
   @Test(expected = ExecutorManagerException.class)
   public void testSetupExecutorsException() throws Exception {
-    this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    this.props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "true");
     final Executor executor1 = this.loader.addExecutor("localhost", 12345);
     final ExecutorManager manager = createExecutorManager();
     final Set<Executor> activeExecutors =
@@ -384,13 +385,13 @@ public class ExecutorManagerTest {
     this.loader = mock(ExecutorLoader.class);
     this.apiGateway = mock(ExecutorApiGateway.class);
     this.user = TestUtils.getTestUser();
-    this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    this.props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "true");
     //To test runningFlows, AZKABAN_QUEUEPROCESSING_ENABLED should be set to true
     //so that flows will be dispatched to executors.
-    this.props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "true");
+    this.props.put(Constants.ConfigurationKeys.QUEUEPROCESSING_ENABLED, "true");
 
     // allow two concurrent runs give one Flow
-    this.props.put(ExecutorManager.AZKABAN_MAX_CONCURRENT_RUNS_ONEFLOW, 2);
+    this.props.put(Constants.ConfigurationKeys.MAX_CONCURRENT_RUNS_ONEFLOW, 2);
 
     final List<Executor> executors = new ArrayList<>();
     final Executor executor1 = new Executor(1, "localhost", 12345, true);
diff --git a/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java b/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java
index 0ddcc85..2620247 100644
--- a/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java
+++ b/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java
@@ -18,13 +18,13 @@
 package azkaban.soloserver;
 
 import static azkaban.ServiceProvider.SERVICE_PROVIDER;
-import static azkaban.executor.ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS;
 import static java.util.Objects.requireNonNull;
 import static org.apache.commons.io.FileUtils.deleteQuietly;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import azkaban.AzkabanCommonModule;
+import azkaban.Constants;
 import azkaban.database.AzkabanDatabaseSetup;
 import azkaban.database.AzkabanDatabaseUpdater;
 import azkaban.execapp.AzkabanExecServerModule;
@@ -37,7 +37,6 @@ import java.io.IOException;
 import java.net.URL;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import org.apache.log4j.Logger;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
@@ -46,7 +45,6 @@ import org.junit.Test;
 public class AzkabanSingleServerTest {
 
   public static final String AZKABAN_DB_SQL_PATH = "azkaban-db/src/main/sql";
-  private static final Logger log = Logger.getLogger(AzkabanSingleServerTest.class);
   private static final Props props = new Props();
 
   private static String getConfPath() {
@@ -81,7 +79,7 @@ public class AzkabanSingleServerTest {
     props.put("database.type", "h2");
     props.put("h2.path", "./h2");
 
-    props.put(AZKABAN_USE_MULTIPLE_EXECUTORS, "false");
+    props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "false");
     props.put("server.port", "0");
     props.put("jetty.port", "0");
     props.put("server.useSSL", "true");
diff --git a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
index c7ba19a..cb69183 100644
--- a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
@@ -19,12 +19,12 @@ package azkaban.webapp;
 
 import static azkaban.ServiceProvider.SERVICE_PROVIDER;
 import static azkaban.ServiceProviderTest.assertSingleton;
-import static azkaban.executor.ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS;
 import static java.util.Objects.requireNonNull;
 import static org.apache.commons.io.FileUtils.deleteQuietly;
 import static org.junit.Assert.assertNotNull;
 
 import azkaban.AzkabanCommonModule;
+import azkaban.Constants;
 import azkaban.database.AzkabanDatabaseSetup;
 import azkaban.database.AzkabanDatabaseUpdater;
 import azkaban.db.DatabaseOperator;
@@ -92,7 +92,7 @@ public class AzkabanWebServerTest {
     props.put("database.type", "h2");
     props.put("h2.path", "./h2");
 
-    props.put(AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "true");
     props.put("server.port", "0");
     props.put("jetty.port", "0");
     props.put("server.useSSL", "true");