azkaban-aplcache
Changes
az-core/src/main/java/azkaban/Constants.java 15(+15 -0)
Details
az-core/src/main/java/azkaban/Constants.java 15(+15 -0)
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index d944eed..847d7d3 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -195,6 +195,21 @@ public class Constants {
// dir to keep dependency plugins
public static final String DEPENDENCY_PLUGIN_DIR = "azkaban.dependency.plugin.dir";
+
+ public static final String USE_MULTIPLE_EXECUTORS = "azkaban.use.multiple.executors";
+ public static final String MAX_CONCURRENT_RUNS_ONEFLOW = "azkaban.max.concurrent.runs.oneflow";
+ public static final String WEBSERVER_QUEUE_SIZE = "azkaban.webserver.queue.size";
+ public static final String ACTIVE_EXECUTOR_REFRESH_IN_MS =
+ "azkaban.activeexecutor.refresh.milisecinterval";
+ public static final String ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW =
+ "azkaban.activeexecutor.refresh.flowinterval";
+ public static final String EXECUTORINFO_REFRESH_MAX_THREADS =
+ "azkaban.executorinfo.refresh.maxThreads";
+ public static final String MAX_DISPATCHING_ERRORS_PERMITTED = "azkaban.maxDispatchingErrors";
+ public static final String EXECUTOR_SELECTOR_FILTERS = "azkaban.executorselector.filters";
+ public static final String EXECUTOR_SELECTOR_COMPARATOR_PREFIX =
+ "azkaban.executorselector.comparator.";
+ public static final String QUEUEPROCESSING_ENABLED = "azkaban.queueprocessing.enabled";
}
public static class FlowProperties {
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 267cc9a..6cf0fa8 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -65,27 +65,7 @@ import org.joda.time.DateTime;
public class ExecutorManager extends EventHandler implements
ExecutorManagerAdapter {
- public static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
- "azkaban.use.multiple.executors";
- public static final String AZKABAN_MAX_CONCURRENT_RUNS_ONEFLOW =
- "azkaban.max.concurrent.runs.oneflow";
- static final String AZKABAN_EXECUTOR_SELECTOR_FILTERS =
- "azkaban.executorselector.filters";
- static final String AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX =
- "azkaban.executorselector.comparator.";
- static final String AZKABAN_QUEUEPROCESSING_ENABLED =
- "azkaban.queueprocessing.enabled";
- private static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
- "azkaban.webserver.queue.size";
- private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS =
- "azkaban.activeexecutor.refresh.milisecinterval";
- private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW =
- "azkaban.activeexecutor.refresh.flowinterval";
private static final int DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW = 30;
- private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS =
- "azkaban.executorinfo.refresh.maxThreads";
- private static final String AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED =
- "azkaban.maxDispatchingErrors";
// 12 weeks
private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
* 24 * 60 * 60 * 1000L;
@@ -127,12 +107,14 @@ public class ExecutorManager extends EventHandler implements
this.setupExecutors();
this.loadRunningFlows();
- this.queuedFlows = new QueuedExecutions(azkProps.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
+ this.queuedFlows = new QueuedExecutions(
+ azkProps.getLong(Constants.ConfigurationKeys.WEBSERVER_QUEUE_SIZE, 100000));
// The default threshold is set to 30 for now, in case some users are affected. We may
// decrease this number in future, to better prevent DDos attacks.
- this.maxConcurrentRunsOneFlow = azkProps.getInt(AZKABAN_MAX_CONCURRENT_RUNS_ONEFLOW,
- DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW);
+ this.maxConcurrentRunsOneFlow = azkProps
+ .getInt(Constants.ConfigurationKeys.MAX_CONCURRENT_RUNS_ONEFLOW,
+ DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW);
this.loadQueuedFlows();
this.cacheDir = new File(azkProps.getString("cache.directory", "cache"));
@@ -155,15 +137,16 @@ public class ExecutorManager extends EventHandler implements
private void setupMultiExecutorMode() {
// initliatize hard filters for executor selector from azkaban.properties
- final String filters = this.azkProps.getString(AZKABAN_EXECUTOR_SELECTOR_FILTERS, "");
+ final String filters = this.azkProps
+ .getString(Constants.ConfigurationKeys.EXECUTOR_SELECTOR_FILTERS, "");
if (filters != null) {
this.filterList = Arrays.asList(StringUtils.split(filters, ","));
}
// initliatize comparator feature weights for executor selector from
// azkaban.properties
- final Map<String, String> compListStrings =
- this.azkProps.getMapByPrefix(AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
+ final Map<String, String> compListStrings = this.azkProps
+ .getMapByPrefix(Constants.ConfigurationKeys.EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
if (compListStrings != null) {
this.comparatorWeightsMap = new TreeMap<>();
for (final Map.Entry<String, String> entry : compListStrings.entrySet()) {
@@ -173,15 +156,18 @@ public class ExecutorManager extends EventHandler implements
this.executorInforRefresherService =
Executors.newFixedThreadPool(this.azkProps.getInt(
- AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS, 5));
+ Constants.ConfigurationKeys.EXECUTORINFO_REFRESH_MAX_THREADS, 5));
// configure queue processor
this.queueProcessor =
- new QueueProcessorThread(this.azkProps.getBoolean(
- AZKABAN_QUEUEPROCESSING_ENABLED, true), this.azkProps.getLong(
- AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000), this.azkProps.getInt(
- AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5), this.azkProps.getInt(
- AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED, this.activeExecutors.size()));
+ new QueueProcessorThread(
+ this.azkProps.getBoolean(Constants.ConfigurationKeys.QUEUEPROCESSING_ENABLED, true),
+ this.azkProps.getLong(Constants.ConfigurationKeys.ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000),
+ this.azkProps.getInt(
+ Constants.ConfigurationKeys.ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5),
+ this.azkProps.getInt(
+ Constants.ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED,
+ this.activeExecutors.size()));
this.queueProcessor.start();
}
@@ -232,7 +218,7 @@ public class ExecutorManager extends EventHandler implements
}
private boolean isMultiExecutorMode() {
- return this.azkProps.getBoolean(AZKABAN_USE_MULTIPLE_EXECUTORS, false);
+ return this.azkProps.getBoolean(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, false);
}
/**
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 88d7812..3406ec9 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import azkaban.Constants;
import azkaban.metrics.CommonMetrics;
import azkaban.metrics.MetricsManager;
import azkaban.user.User;
@@ -85,8 +86,8 @@ public class ExecutorManagerTest {
* Helper method to create a ExecutorManager Instance
*/
private ExecutorManager createMultiExecutorManagerInstance() throws Exception {
- this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
- this.props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "false");
+ this.props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "true");
+ this.props.put(Constants.ConfigurationKeys.QUEUEPROCESSING_ENABLED, "false");
this.loader.addExecutor("localhost", 12345);
this.loader.addExecutor("localhost", 12346);
return createExecutorManager();
@@ -98,7 +99,7 @@ public class ExecutorManagerTest {
*/
@Test(expected = ExecutorManagerException.class)
public void testNoExecutorScenario() throws Exception {
- this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ this.props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "true");
@SuppressWarnings("unused") final ExecutorManager manager = createExecutorManager();
}
@@ -125,7 +126,7 @@ public class ExecutorManagerTest {
*/
@Test
public void testMultipleExecutorScenario() throws Exception {
- this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ this.props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "true");
final Executor executor1 = this.loader.addExecutor("localhost", 12345);
final Executor executor2 = this.loader.addExecutor("localhost", 12346);
@@ -147,7 +148,7 @@ public class ExecutorManagerTest {
*/
@Test
public void testSetupExecutorsSucess() throws Exception {
- this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ this.props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "true");
final Executor executor1 = this.loader.addExecutor("localhost", 12345);
final ExecutorManager manager = createExecutorManager();
Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
@@ -170,7 +171,7 @@ public class ExecutorManagerTest {
*/
@Test(expected = ExecutorManagerException.class)
public void testSetupExecutorsException() throws Exception {
- this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ this.props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "true");
final Executor executor1 = this.loader.addExecutor("localhost", 12345);
final ExecutorManager manager = createExecutorManager();
final Set<Executor> activeExecutors =
@@ -384,13 +385,13 @@ public class ExecutorManagerTest {
this.loader = mock(ExecutorLoader.class);
this.apiGateway = mock(ExecutorApiGateway.class);
this.user = TestUtils.getTestUser();
- this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ this.props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "true");
//To test runningFlows, AZKABAN_QUEUEPROCESSING_ENABLED should be set to true
//so that flows will be dispatched to executors.
- this.props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "true");
+ this.props.put(Constants.ConfigurationKeys.QUEUEPROCESSING_ENABLED, "true");
// allow two concurrent runs give one Flow
- this.props.put(ExecutorManager.AZKABAN_MAX_CONCURRENT_RUNS_ONEFLOW, 2);
+ this.props.put(Constants.ConfigurationKeys.MAX_CONCURRENT_RUNS_ONEFLOW, 2);
final List<Executor> executors = new ArrayList<>();
final Executor executor1 = new Executor(1, "localhost", 12345, true);
diff --git a/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java b/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java
index 0ddcc85..2620247 100644
--- a/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java
+++ b/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java
@@ -18,13 +18,13 @@
package azkaban.soloserver;
import static azkaban.ServiceProvider.SERVICE_PROVIDER;
-import static azkaban.executor.ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS;
import static java.util.Objects.requireNonNull;
import static org.apache.commons.io.FileUtils.deleteQuietly;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import azkaban.AzkabanCommonModule;
+import azkaban.Constants;
import azkaban.database.AzkabanDatabaseSetup;
import azkaban.database.AzkabanDatabaseUpdater;
import azkaban.execapp.AzkabanExecServerModule;
@@ -37,7 +37,6 @@ import java.io.IOException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
-import org.apache.log4j.Logger;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
@@ -46,7 +45,6 @@ import org.junit.Test;
public class AzkabanSingleServerTest {
public static final String AZKABAN_DB_SQL_PATH = "azkaban-db/src/main/sql";
- private static final Logger log = Logger.getLogger(AzkabanSingleServerTest.class);
private static final Props props = new Props();
private static String getConfPath() {
@@ -81,7 +79,7 @@ public class AzkabanSingleServerTest {
props.put("database.type", "h2");
props.put("h2.path", "./h2");
- props.put(AZKABAN_USE_MULTIPLE_EXECUTORS, "false");
+ props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "false");
props.put("server.port", "0");
props.put("jetty.port", "0");
props.put("server.useSSL", "true");
diff --git a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
index c7ba19a..cb69183 100644
--- a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
@@ -19,12 +19,12 @@ package azkaban.webapp;
import static azkaban.ServiceProvider.SERVICE_PROVIDER;
import static azkaban.ServiceProviderTest.assertSingleton;
-import static azkaban.executor.ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS;
import static java.util.Objects.requireNonNull;
import static org.apache.commons.io.FileUtils.deleteQuietly;
import static org.junit.Assert.assertNotNull;
import azkaban.AzkabanCommonModule;
+import azkaban.Constants;
import azkaban.database.AzkabanDatabaseSetup;
import azkaban.database.AzkabanDatabaseUpdater;
import azkaban.db.DatabaseOperator;
@@ -92,7 +92,7 @@ public class AzkabanWebServerTest {
props.put("database.type", "h2");
props.put("h2.path", "./h2");
- props.put(AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "true");
props.put("server.port", "0");
props.put("jetty.port", "0");
props.put("server.useSSL", "true");