azkaban-aplcache
Changes
docs/configuration.rst 9(+1 -8)
Details
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 0f2b336..00f0135 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -58,8 +58,7 @@ public class Constants {
public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
public static final String DEFAULT_CONF_PATH = "conf";
- public static final String AZKABAN_EXECUTOR_PORT_FILENAME = "executor.port";
- public static final String AZKABAN_EXECUTOR_PORT_FILE = "executor.portfile";
+ public static final String DEFAULT_EXECUTOR_PORT_FILE = "executor.port";
public static final String AZKABAN_SERVLET_CONTEXT_KEY = "azkaban_app";
@@ -156,10 +155,8 @@ public class Constants {
// Legacy configs section, new configs should follow the naming convention of azkaban.server.<rest of the name> for server configs.
- // The property is used for the web server to get the host name of the executor when running in SOLO mode.
- public static final String EXECUTOR_HOST = "executor.host";
-
- // The property is used for the web server to get the port of the executor when running in SOLO mode.
+ public static final String EXECUTOR_PORT_FILE = "executor.portfile";
+ // To set a fixed port for executor-server. Otherwise some available port is used.
public static final String EXECUTOR_PORT = "executor.port";
// Max flow running time in mins, server will kill flows running longer than this setting.
diff --git a/azkaban-common/src/main/java/azkaban/executor/ActiveExecutors.java b/azkaban-common/src/main/java/azkaban/executor/ActiveExecutors.java
index 28b8bb0..4b4356d 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ActiveExecutors.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ActiveExecutors.java
@@ -16,8 +16,6 @@
package azkaban.executor;
-import azkaban.Constants.ConfigurationKeys;
-import azkaban.utils.Props;
import com.google.common.collect.ImmutableSet;
import java.util.Collection;
import javax.inject.Inject;
@@ -31,12 +29,10 @@ public class ActiveExecutors {
private static final Logger logger = Logger.getLogger(ExecutorManager.class);
private volatile ImmutableSet<Executor> activeExecutors;
- private final Props azkProps;
private final ExecutorLoader executorLoader;
@Inject
- public ActiveExecutors(final Props azkProps, final ExecutorLoader executorLoader) {
- this.azkProps = azkProps;
+ public ActiveExecutors(final ExecutorLoader executorLoader) {
this.executorLoader = executorLoader;
}
@@ -47,13 +43,7 @@ public class ActiveExecutors {
* fails.
*/
public void setupExecutors() throws ExecutorManagerException {
- final ImmutableSet<Executor> newExecutors;
- if (ExecutorManager.isMultiExecutorMode(this.azkProps)) {
- newExecutors = setupMultiExecutors();
- } else {
- // TODO remove this - switch everything to use multi-executor mode
- newExecutors = setupSingleExecutor();
- }
+ final ImmutableSet<Executor> newExecutors = loadExecutors();
if (newExecutors.isEmpty()) {
final String error = "No active executors found";
logger.error(error);
@@ -72,38 +62,9 @@ public class ActiveExecutors {
return this.activeExecutors;
}
- private ImmutableSet<Executor> setupMultiExecutors() throws ExecutorManagerException {
- logger.info("Initializing multi executors from database.");
+ private ImmutableSet<Executor> loadExecutors() throws ExecutorManagerException {
+ logger.info("Initializing executors from database.");
return ImmutableSet.copyOf(this.executorLoader.fetchActiveExecutors());
}
- private ImmutableSet<Executor> setupSingleExecutor() throws ExecutorManagerException {
- if (this.azkProps.containsKey(ConfigurationKeys.EXECUTOR_PORT)) {
- return getOrAddSingleExecutor();
- } else {
- // throw exception when in single executor mode and no executor port specified in azkaban
- // properties
- //todo chengren311: convert to slf4j and parameterized logging
- final String error = "Missing" + ConfigurationKeys.EXECUTOR_PORT + " in azkaban properties.";
- logger.error(error);
- throw new ExecutorManagerException(error);
- }
- }
-
- private ImmutableSet<Executor> getOrAddSingleExecutor() throws ExecutorManagerException {
- // add single executor, if specified as per properties
- final String executorHost = this.azkProps
- .getString(ConfigurationKeys.EXECUTOR_HOST, "localhost");
- final int executorPort = this.azkProps.getInt(ConfigurationKeys.EXECUTOR_PORT);
- logger.info(String.format("Initializing single executor %s:%d", executorHost, executorPort));
- Executor executor = this.executorLoader.fetchExecutor(executorHost, executorPort);
- if (executor == null) {
- executor = this.executorLoader.addExecutor(executorHost, executorPort);
- } else if (!executor.isActive()) {
- executor.setActive(true);
- this.executorLoader.updateExecutor(executor);
- }
- return ImmutableSet.of(new Executor(executor.getId(), executorHost, executorPort, true));
- }
-
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 3fede00..87e0346 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -108,8 +108,9 @@ public class ExecutorManager extends EventHandler implements
private List<String> filterList;
private Map<String, Integer> comparatorWeightsMap;
private long lastSuccessfulExecutorInfoRefresh;
- private ExecutorService executorInforRefresherService;
+ private final ExecutorService executorInfoRefresherService;
private Duration sleepAfterDispatchFailure = Duration.ofSeconds(1L);
+ private boolean initialized = false;
@Inject
public ExecutorManager(final Props azkProps, final ExecutorLoader executorLoader,
@@ -129,30 +130,39 @@ public class ExecutorManager extends EventHandler implements
this.updaterStage = updaterStage;
this.executionFinalizer = executionFinalizer;
this.updaterThread = updaterThread;
- this.setupExecutors();
- this.loadRunningExecutions();
-
- this.queuedFlows = new QueuedExecutions(
- azkProps.getLong(Constants.ConfigurationKeys.WEBSERVER_QUEUE_SIZE, 100000));
+ this.maxConcurrentRunsOneFlow = getMaxConcurrentRunsOneFlow(azkProps);
+ this.cleanerThread = createCleanerThread();
+ this.executorInfoRefresherService = createExecutorInfoRefresherService();
+ }
+ private int getMaxConcurrentRunsOneFlow(final Props azkProps) {
// The default threshold is set to 30 for now, in case some users are affected. We may
// decrease this number in future, to better prevent DDos attacks.
- this.maxConcurrentRunsOneFlow = azkProps
- .getInt(Constants.ConfigurationKeys.MAX_CONCURRENT_RUNS_ONEFLOW,
- DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW);
- this.loadQueuedFlows();
+ return azkProps.getInt(ConfigurationKeys.MAX_CONCURRENT_RUNS_ONEFLOW,
+ DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW);
+ }
- this.cacheDir = new File(azkProps.getString("cache.directory", "cache"));
+ private CleanerThread createCleanerThread() {
+ final long executionLogsRetentionMs = this.azkProps.getLong("execution.logs.retention.ms",
+ DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+ return new CleanerThread(executionLogsRetentionMs);
+ }
- if (isMultiExecutorMode()) {
- setupMultiExecutorMode();
+ void initialize() throws ExecutorManagerException {
+ if (this.initialized) {
+ return;
}
-
- final long executionLogsRetentionMs =
- azkProps.getLong("execution.logs.retention.ms",
- DEFAULT_EXECUTION_LOGS_RETENTION_MS);
-
- this.cleanerThread = new CleanerThread(executionLogsRetentionMs);
+ this.initialized = true;
+ this.setupExecutors();
+ this.loadRunningExecutions();
+ this.queuedFlows = new QueuedExecutions(
+ this.azkProps.getLong(ConfigurationKeys.WEBSERVER_QUEUE_SIZE, 100000));
+ this.loadQueuedFlows();
+ this.cacheDir = new File(this.azkProps.getString("cache.directory", "cache"));
+ // TODO extract QueueProcessor as a separate class, move all of this into it
+ setupExecutotrComparatorWeightsMap();
+ setupExecutorFilterList();
+ this.queueProcessor = setupQueueProcessor();
}
// TODO move to some common place
@@ -167,17 +177,11 @@ public class ExecutorManager extends EventHandler implements
}
}
- // TODO switch to always use "multi executor mode" - even for single server
- public static boolean isMultiExecutorMode(final Props props) {
- return props.getBoolean(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, false);
- }
-
- public void start() {
+ public void start() throws ExecutorManagerException {
+ initialize();
this.updaterThread.start();
this.cleanerThread.start();
- if (isMultiExecutorMode()) {
- this.queueProcessor.start();
- }
+ this.queueProcessor.start();
}
private String findApplicationIdFromLog(final String logData) {
@@ -190,39 +194,42 @@ public class ExecutorManager extends EventHandler implements
return appId;
}
- private void setupMultiExecutorMode() {
- // initialize hard filters for executor selector from azkaban.properties
- final String filters = this.azkProps
- .getString(Constants.ConfigurationKeys.EXECUTOR_SELECTOR_FILTERS, "");
- if (filters != null) {
- this.filterList = Arrays.asList(StringUtils.split(filters, ","));
- }
+ private QueueProcessorThread setupQueueProcessor() {
+ return new QueueProcessorThread(
+ this.azkProps.getBoolean(Constants.ConfigurationKeys.QUEUEPROCESSING_ENABLED, true),
+ this.azkProps.getLong(Constants.ConfigurationKeys.ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000),
+ this.azkProps.getInt(
+ Constants.ConfigurationKeys.ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5),
+ this.azkProps.getInt(
+ Constants.ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED,
+ this.activeExecutors.getAll().size()),
+ this.sleepAfterDispatchFailure);
+ }
+ private void setupExecutotrComparatorWeightsMap() {
// initialize comparator feature weights for executor selector from azkaban.properties
final Map<String, String> compListStrings = this.azkProps
- .getMapByPrefix(Constants.ConfigurationKeys.EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
+ .getMapByPrefix(ConfigurationKeys.EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
if (compListStrings != null) {
this.comparatorWeightsMap = new TreeMap<>();
for (final Map.Entry<String, String> entry : compListStrings.entrySet()) {
this.comparatorWeightsMap.put(entry.getKey(), Integer.valueOf(entry.getValue()));
}
}
+ }
- this.executorInforRefresherService =
- Executors.newFixedThreadPool(this.azkProps.getInt(
- Constants.ConfigurationKeys.EXECUTORINFO_REFRESH_MAX_THREADS, 5));
+ private void setupExecutorFilterList() {
+ // initialize hard filters for executor selector from azkaban.properties
+ final String filters = this.azkProps
+ .getString(ConfigurationKeys.EXECUTOR_SELECTOR_FILTERS, "");
+ if (filters != null) {
+ this.filterList = Arrays.asList(StringUtils.split(filters, ","));
+ }
+ }
- // configure queue processor
- this.queueProcessor =
- new QueueProcessorThread(
- this.azkProps.getBoolean(Constants.ConfigurationKeys.QUEUEPROCESSING_ENABLED, true),
- this.azkProps.getLong(Constants.ConfigurationKeys.ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000),
- this.azkProps.getInt(
- Constants.ConfigurationKeys.ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5),
- this.azkProps.getInt(
- Constants.ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED,
- this.activeExecutors.getAll().size()),
- this.sleepAfterDispatchFailure);
+ private ExecutorService createExecutorInfoRefresherService() {
+ return Executors.newFixedThreadPool(this.azkProps.getInt(
+ ConfigurationKeys.EXECUTORINFO_REFRESH_MAX_THREADS, 5));
}
/**
@@ -232,11 +239,21 @@ public class ExecutorManager extends EventHandler implements
*/
@Override
public void setupExecutors() throws ExecutorManagerException {
+ checkMultiExecutorMode();
this.activeExecutors.setupExecutors();
}
- private boolean isMultiExecutorMode() {
- return isMultiExecutorMode(this.azkProps);
+ // TODO Enforced for now to ensure that users migrate to multi-executor mode acknowledgingly.
+ // TODO Remove this once confident enough that all active users have already updated to some
+ // version new enough to have this change - for example after 1 year has passed.
+ // TODO Then also delete ConfigurationKeys.USE_MULTIPLE_EXECUTORS.
+ @Deprecated
+ private void checkMultiExecutorMode() {
+ if (!this.azkProps.getBoolean(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, false)) {
+ throw new IllegalArgumentException(
+ Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS +
+ " must be true. Single executor mode is not supported any more.");
+ }
}
/**
@@ -249,7 +266,7 @@ public class ExecutorManager extends EventHandler implements
for (final Executor executor : this.activeExecutors.getAll()) {
// execute each executorInfo refresh task to fetch
final Future<ExecutorInfo> fetchExecutionInfo =
- this.executorInforRefresherService.submit(
+ this.executorInfoRefresherService.submit(
() -> this.apiGateway.callForJsonType(executor.getHost(),
executor.getPort(), "/serverStatistics", null, ExecutorInfo.class));
futures.add(new Pair<>(executor,
@@ -286,41 +303,23 @@ public class ExecutorManager extends EventHandler implements
}
/**
- * Throws exception if running in local mode {@inheritDoc}
- *
* @see azkaban.executor.ExecutorManagerAdapter#disableQueueProcessorThread()
*/
@Override
- public void disableQueueProcessorThread() throws ExecutorManagerException {
- if (isMultiExecutorMode()) {
- this.queueProcessor.setActive(false);
- } else {
- throw new ExecutorManagerException(
- "Cannot disable QueueProcessor in local mode");
- }
+ public void disableQueueProcessorThread() {
+ this.queueProcessor.setActive(false);
}
/**
- * Throws exception if running in local mode {@inheritDoc}
- *
* @see azkaban.executor.ExecutorManagerAdapter#enableQueueProcessorThread()
*/
@Override
- public void enableQueueProcessorThread() throws ExecutorManagerException {
- if (isMultiExecutorMode()) {
- this.queueProcessor.setActive(true);
- } else {
- throw new ExecutorManagerException(
- "Cannot enable QueueProcessor in local mode");
- }
+ public void enableQueueProcessorThread() {
+ this.queueProcessor.setActive(true);
}
public State getQueueProcessorThreadState() {
- if (isMultiExecutorMode()) {
- return this.queueProcessor.getState();
- } else {
- return State.NEW; // not started in local mode
- }
+ return this.queueProcessor.getState();
}
/**
@@ -328,11 +327,7 @@ public class ExecutorManager extends EventHandler implements
* dispatched as expected
*/
public boolean isQueueProcessorThreadActive() {
- if (isMultiExecutorMode()) {
- return this.queueProcessor.isActive();
- } else {
- return false;
- }
+ return this.queueProcessor.isActive();
}
/**
@@ -1113,30 +1108,9 @@ public class ExecutorManager extends EventHandler implements
final ExecutionReference reference =
new ExecutionReference(exflow.getExecutionId());
- if (isMultiExecutorMode()) {
- //Take MultiExecutor route
- this.executorLoader.addActiveExecutableReference(reference);
- this.queuedFlows.enqueue(exflow, reference);
- } else {
- // assign only local executor we have
- final Executor choosenExecutor = this.activeExecutors.getAll().iterator().next();
- this.executorLoader.addActiveExecutableReference(reference);
- try {
- dispatch(reference, exflow, choosenExecutor);
- this.commonMetrics.markDispatchSuccess();
- } catch (final ExecutorManagerException e) {
- // When flow dispatch fails, should update the flow status
- // to FAILED in execution_flows DB table as well. Currently
- // this logic is only implemented in multiExecutorMode but
- // missed in single executor case.
- this.commonMetrics.markDispatchFail();
- this.executionFinalizer.finalizeFlow(exflow, "Dispatching failed", e);
- throw e;
- }
- }
- message +=
- "Execution submitted successfully with exec id "
- + exflow.getExecutionId();
+ this.executorLoader.addActiveExecutableReference(reference);
+ this.queuedFlows.enqueue(exflow, reference);
+ message += "Execution queued successfully with exec id " + exflow.getExecutionId();
}
return message;
}
@@ -1199,9 +1173,7 @@ public class ExecutorManager extends EventHandler implements
@Override
public void shutdown() {
- if (isMultiExecutorMode()) {
- this.queueProcessor.shutdown();
- }
+ this.queueProcessor.shutdown();
this.updaterThread.shutdown();
}
@@ -1478,7 +1450,7 @@ public class ExecutorManager extends EventHandler implements
+ "reached " + ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED
+ " (tried " + reference.getNumErrors() + " executors)";
ExecutorManager.logger.error(message);
- executionFinalizer.finalizeFlow(exflow, message, lastError);
+ ExecutorManager.this.executionFinalizer.finalizeFlow(exflow, message, lastError);
}
}
@@ -1486,7 +1458,7 @@ public class ExecutorManager extends EventHandler implements
final Executor selectedExecutor) {
remainingExecutors.remove(selectedExecutor);
if (remainingExecutors.isEmpty()) {
- remainingExecutors.addAll(activeExecutors.getAll());
+ remainingExecutors.addAll(ExecutorManager.this.activeExecutors.getAll());
sleepAfterDispatchFailure();
}
}
@@ -1579,7 +1551,7 @@ public class ExecutorManager extends EventHandler implements
}
@VisibleForTesting
- void setSleepAfterDispatchFailure(Duration sleepAfterDispatchFailure) {
+ void setSleepAfterDispatchFailure(final Duration sleepAfterDispatchFailure) {
this.sleepAfterDispatchFailure = sleepAfterDispatchFailure;
}
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 48017c6..81e1419 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -17,6 +17,7 @@
package azkaban.executor;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.any;
@@ -26,6 +27,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import azkaban.Constants;
+import azkaban.Constants.ConfigurationKeys;
import azkaban.alert.Alerter;
import azkaban.metrics.CommonMetrics;
import azkaban.metrics.MetricsManager;
@@ -113,21 +115,15 @@ public class ExecutorManagerTest {
}
/*
- * Test backward compatibility with just local executor
+ * Test error message with unsupported local executor conf
*/
@Test
- public void testLocalExecutorScenario() throws Exception {
- this.props.put("executor.port", 12345);
- final ExecutorManager manager = createExecutorManager();
- final Set<Executor> activeExecutors =
- new HashSet(manager.getAllActiveExecutors());
-
- Assert.assertEquals(activeExecutors.size(), 1);
- final Executor executor = activeExecutors.iterator().next();
- Assert.assertEquals(executor.getHost(), "localhost");
- Assert.assertEquals(executor.getPort(), 12345);
- Assert.assertArrayEquals(activeExecutors.toArray(), this.loader
- .fetchActiveExecutors().toArray());
+ public void testLocalExecutorScenario() {
+ this.props.put(ConfigurationKeys.EXECUTOR_PORT, 12345);
+ final Throwable thrown = catchThrowable(() -> createExecutorManager());
+ assertThat(thrown).isInstanceOf(IllegalArgumentException.class);
+ assertThat(thrown.getMessage()).isEqualTo(
+ "azkaban.use.multiple.executors must be true. Single executor mode is not supported any more.");
}
/*
@@ -149,7 +145,7 @@ public class ExecutorManagerTest {
private ExecutorManager createExecutorManager()
throws ExecutorManagerException {
// TODO rename this test to ExecutorManagerIntegrationTest & create separate unit tests as well?
- final ActiveExecutors activeExecutors = new ActiveExecutors(this.props, this.loader);
+ final ActiveExecutors activeExecutors = new ActiveExecutors(this.loader);
final ExecutionFinalizer executionFinalizer = new ExecutionFinalizer(this.loader,
this.updaterStage, this.alertHolder, this.runningExecutions);
final RunningExecutionsUpdaterThread updaterThread = new RunningExecutionsUpdaterThread(
@@ -162,6 +158,7 @@ public class ExecutorManagerTest {
this.commonMetrics, this.apiGateway, this.runningExecutions, activeExecutors,
this.updaterStage, executionFinalizer, updaterThread);
executorManager.setSleepAfterDispatchFailure(Duration.ZERO);
+ executorManager.initialize();
return executorManager;
}
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
index c8ac487..6021af2 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
@@ -18,6 +18,7 @@ package azkaban.trigger;
import static org.mockito.Mockito.mock;
+import azkaban.Constants.ConfigurationKeys;
import azkaban.executor.ActiveExecutors;
import azkaban.executor.AlerterHolder;
import azkaban.executor.ExecutionFinalizer;
@@ -61,7 +62,7 @@ public class TriggerManagerDeadlockTest {
this.loader = new MockTriggerLoader();
final Props props = new Props();
props.put("trigger.scan.interval", 1000);
- props.put("executor.port", 12321);
+ props.put(ConfigurationKeys.EXECUTOR_PORT, 12321);
this.execLoader = new MockExecutorLoader();
this.apiGateway = mock(ExecutorApiGateway.class);
this.runningExecutions = new RunningExecutions();
@@ -75,7 +76,7 @@ public class TriggerManagerDeadlockTest {
}
private ExecutorManager getExecutorManager(final Props props) throws ExecutorManagerException {
- final ActiveExecutors activeExecutors = new ActiveExecutors(props, this.execLoader);
+ final ActiveExecutors activeExecutors = new ActiveExecutors(this.execLoader);
final RunningExecutionsUpdaterThread updaterThread = getRunningExecutionsUpdaterThread();
return new ExecutorManager(props, this.execLoader, this.commonMetrics, this.apiGateway,
this.runningExecutions, activeExecutors, this.updaterStage, this.executionFinalizer,
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 99031f2..195cb52 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -16,7 +16,7 @@
package azkaban.execapp;
-import static azkaban.Constants.AZKABAN_EXECUTOR_PORT_FILENAME;
+import static azkaban.Constants.DEFAULT_EXECUTOR_PORT_FILE;
import static azkaban.Constants.ConfigurationKeys;
import static azkaban.ServiceProvider.SERVICE_PROVIDER;
import static azkaban.execapp.ExecJettyServerModule.EXEC_JETTY_SERVER;
@@ -289,7 +289,7 @@ public class AzkabanExecutorServer {
private void dumpPortToFile() throws IOException {
// By default this should write to the working directory
final String portFileName = this.props
- .getString(Constants.AZKABAN_EXECUTOR_PORT_FILE, AZKABAN_EXECUTOR_PORT_FILENAME);
+ .getString(ConfigurationKeys.EXECUTOR_PORT_FILE, DEFAULT_EXECUTOR_PORT_FILE);
FileIOUtils.dumpNumberToFile(Paths.get(portFileName), getPort());
}
@@ -468,7 +468,7 @@ public class AzkabanExecutorServer {
* @return hostname
*/
public String getHost() {
- if (this.props.containsKey(Constants.ConfigurationKeys.AZKABAN_SERVER_HOST_NAME)) {
+ if (this.props.containsKey(ConfigurationKeys.AZKABAN_SERVER_HOST_NAME)) {
final String hostName = this.props
.getString(Constants.ConfigurationKeys.AZKABAN_SERVER_HOST_NAME);
if (!StringUtils.isEmpty(hostName)) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecJettyServerModule.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecJettyServerModule.java
index f449281..3bd2b63 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecJettyServerModule.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecJettyServerModule.java
@@ -1,5 +1,6 @@
package azkaban.execapp;
+import azkaban.Constants.ConfigurationKeys;
import azkaban.utils.Props;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
@@ -38,7 +39,7 @@ public class ExecJettyServerModule extends AbstractModule {
* The Jetty server automatically finds an unused port when the port number is set to zero
* TODO: This is using a highly outdated version of jetty [year 2010]. needs to be updated.
*/
- final Server server = new Server(props.getInt("executor.port", 0));
+ final Server server = new Server(props.getInt(ConfigurationKeys.EXECUTOR_PORT, 0));
final QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
server.setThreadPool(httpThreadPool);
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/AzkabanExecutorServerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/AzkabanExecutorServerTest.java
index 2e50b15..35b01ae 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/AzkabanExecutorServerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/AzkabanExecutorServerTest.java
@@ -75,7 +75,7 @@ public class AzkabanExecutorServerTest {
public static void tearDown() throws Exception {
deleteQuietly(new File("h2.mv.db"));
deleteQuietly(new File("h2.trace.db"));
- deleteQuietly(new File("executor.port"));
+ deleteQuietly(new File(Constants.DEFAULT_EXECUTOR_PORT_FILE));
deleteQuietly(new File("executions"));
deleteQuietly(new File("projects"));
}
diff --git a/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java b/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java
index 2620247..cc2903c 100644
--- a/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java
+++ b/azkaban-solo-server/src/test/java/azkaban/soloserver/AzkabanSingleServerTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
import azkaban.AzkabanCommonModule;
import azkaban.Constants;
+import azkaban.Constants.ConfigurationKeys;
import azkaban.database.AzkabanDatabaseSetup;
import azkaban.database.AzkabanDatabaseUpdater;
import azkaban.execapp.AzkabanExecServerModule;
@@ -62,10 +63,10 @@ public class AzkabanSingleServerTest {
}
@AfterClass
- public static void tearDown() throws Exception {
+ public static void tearDown() {
deleteQuietly(new File("h2.mv.db"));
deleteQuietly(new File("h2.trace.db"));
- deleteQuietly(new File("executor.port"));
+ deleteQuietly(new File(Constants.DEFAULT_EXECUTOR_PORT_FILE));
deleteQuietly(new File("executions"));
deleteQuietly(new File("projects"));
}
@@ -79,13 +80,13 @@ public class AzkabanSingleServerTest {
props.put("database.type", "h2");
props.put("h2.path", "./h2");
- props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "false");
+ props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "true");
props.put("server.port", "0");
props.put("jetty.port", "0");
props.put("server.useSSL", "true");
props.put("jetty.use.ssl", "false");
props.put("user.manager.xml.file", new File(confPath, "azkaban-users.xml").getPath());
- props.put("executor.port", "12321");
+ props.put(ConfigurationKeys.EXECUTOR_PORT, "12321");
// Quartz settings
props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
@@ -98,13 +99,13 @@ public class AzkabanSingleServerTest {
}
@Test
- public void testInjection() throws Exception {
+ public void testInjection() {
SERVICE_PROVIDER.unsetInjector();
/* Initialize Guice Injector */
final Injector injector = Guice.createInjector(
new AzkabanCommonModule(props),
- new AzkabanWebServerModule(),
- new AzkabanExecServerModule()
+ new AzkabanExecServerModule(),
+ new AzkabanWebServerModule()
);
SERVICE_PROVIDER.setInjector(injector);
diff --git a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
index cb69183..7a11e4e 100644
--- a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
@@ -111,7 +111,7 @@ public class AzkabanWebServerTest {
deleteQuietly(new File("h2.mv.db"));
deleteQuietly(new File("h2.trace.db"));
- deleteQuietly(new File("executor.port"));
+ deleteQuietly(new File(Constants.DEFAULT_EXECUTOR_PORT_FILE));
deleteQuietly(new File("executions"));
deleteQuietly(new File("projects"));
}
docs/configuration.rst 9(+1 -8)
diff --git a/docs/configuration.rst b/docs/configuration.rst
index b6d0b10..7277750 100644
--- a/docs/configuration.rst
+++ b/docs/configuration.rst
@@ -216,13 +216,6 @@ Executor Manager Properties
+-----------------------+-----------------------+-----------------------+
| Parameter | Description | Default |
+=======================+=======================+=======================+
-| executor.port | The port for the | 12321 |
-| | azkaban executor | |
-| | server | |
-+-----------------------+-----------------------+-----------------------+
-| executor.host | The host for azkaban | localhost |
-| | executor server | |
-+-----------------------+-----------------------+-----------------------+
| execution.logs.retent | Time in milliseconds | 7257600000L (12 |
| ion.ms | that execution logs | weeks) |
| | are retained | |
@@ -288,7 +281,7 @@ Executor Server Properties
+-----------------------+-----------------------+-----------------------+
| Parameter | Description | Default |
+=======================+=======================+=======================+
-| executor.port | The port for azkaban | 12321 |
+| executor.port | The port for azkaban | 0 (any free port) |
| | executor server | |
+-----------------------+-----------------------+-----------------------+
| executor.global.pro | A path to the | none |