diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index f575706..a85faf6 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -160,6 +160,9 @@ public class Constants {
// The property is used for the web server to get the host name of the executor when running in SOLO mode.
public static final String EXECUTOR_HOST = "executor.host";
+ // The property is used for the web server to get the port of the executor when running in SOLO mode.
+ public static final String EXECUTOR_PORT = "executor.port";
+
// Max flow running time in mins, server will kill flows running longer than this setting.
// if not set or <= 0, then there's no restriction on running time.
public static final String AZKABAN_MAX_FLOW_RUNNING_MINS = "azkaban.server.flow.max.running.minutes";
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 3013c4b..dbd9581 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -35,6 +35,7 @@ import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.io.BufferedReader;
import java.io.File;
@@ -92,7 +93,6 @@ public class ExecutorManager extends EventHandler implements
* 24 * 60 * 60 * 1000L;
private static final Duration RECENTLY_FINISHED_LIFETIME = Duration.ofMinutes(10);
private static final Logger logger = Logger.getLogger(ExecutorManager.class);
- final private Set<Executor> activeExecutors = new HashSet<>();
private final AlerterHolder alerterHolder;
private final Props azkProps;
private final CommonMetrics commonMetrics;
@@ -105,6 +105,8 @@ public class ExecutorManager extends EventHandler implements
private final int maxConcurrentRunsOneFlow;
QueuedExecutions queuedFlows;
File cacheDir;
+ //make it immutable to ensure threadsafety
+ private volatile ImmutableSet<Executor> activeExecutors = null;
private QueueProcessorThread queueProcessor;
private volatile Pair<ExecutionReference, ExecutableFlow> runningCandidate = null;
private long lastCleanerThreadCheckTime = -1;
@@ -213,13 +215,13 @@ public class ExecutorManager extends EventHandler implements
final Set<Executor> newExecutors = new HashSet<>();
if (isMultiExecutorMode()) {
- logger.info("Initializing multi executors from database");
+ logger.info("Initializing multi executors from database.");
newExecutors.addAll(this.executorLoader.fetchActiveExecutors());
- } else if (this.azkProps.containsKey("executor.port")) {
- // Add local executor, if specified as per properties
+ } else if (this.azkProps.containsKey(ConfigurationKeys.EXECUTOR_PORT)) {
+ // add local executor, if specified as per properties
final String executorHost = this.azkProps
.getString(Constants.ConfigurationKeys.EXECUTOR_HOST, "localhost");
- final int executorPort = this.azkProps.getInt("executor.port");
+ final int executorPort = this.azkProps.getInt(ConfigurationKeys.EXECUTOR_PORT);
logger.info(String.format("Initializing local executor %s:%d",
executorHost, executorPort));
Executor executor =
@@ -232,19 +234,21 @@ public class ExecutorManager extends EventHandler implements
}
newExecutors.add(new Executor(executor.getId(), executorHost,
executorPort, true));
+ } else {
+ // throw exception when in single executor mode and no executor port specified in azkaban
+ // properties
+ //todo chengren311: convert to slf4j and parameterized logging
+ final String error = "Missing" + ConfigurationKeys.EXECUTOR_PORT + " in azkaban properties.";
+ logger.error(error);
+ throw new ExecutorManagerException(error);
}
if (newExecutors.isEmpty()) {
- logger.error("No active executor found");
- throw new ExecutorManagerException("No active executor found");
- } else if (newExecutors.size() > 1 && !isMultiExecutorMode()) {
- logger.error("Multiple local executors specified");
- throw new ExecutorManagerException("Multiple local executors specified");
+ final String error = "No active executor found";
+ logger.error(error);
+ throw new ExecutorManagerException(error);
} else {
- // clear all active executors, only if we have at least one new active
- // executors
- this.activeExecutors.clear();
- this.activeExecutors.addAll(newExecutors);
+ this.activeExecutors = ImmutableSet.copyOf(newExecutors);
}
}
@@ -256,41 +260,39 @@ public class ExecutorManager extends EventHandler implements
* Refresh Executor stats for all the actie executors in this executorManager
*/
private void refreshExecutors() {
- synchronized (this.activeExecutors) {
-
- final List<Pair<Executor, Future<ExecutorInfo>>> futures =
- new ArrayList<>();
- for (final Executor executor : this.activeExecutors) {
- // execute each executorInfo refresh task to fetch
- final Future<ExecutorInfo> fetchExecutionInfo =
- this.executorInforRefresherService.submit(
- () -> this.apiGateway.callForJsonType(executor.getHost(),
- executor.getPort(), "/serverStatistics", null, ExecutorInfo.class));
- futures.add(new Pair<>(executor,
- fetchExecutionInfo));
- }
- boolean wasSuccess = true;
- for (final Pair<Executor, Future<ExecutorInfo>> refreshPair : futures) {
- final Executor executor = refreshPair.getFirst();
- executor.setExecutorInfo(null); // invalidate cached ExecutorInfo
- try {
- // max 5 secs
- final ExecutorInfo executorInfo = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
- // executorInfo is null if the response was empty
- executor.setExecutorInfo(executorInfo);
- logger.info(String.format(
- "Successfully refreshed executor: %s with executor info : %s",
- executor, executorInfo));
- } catch (final TimeoutException e) {
- wasSuccess = false;
- logger.error("Timed out while waiting for ExecutorInfo refresh"
- + executor, e);
- } catch (final Exception e) {
- wasSuccess = false;
- logger.error("Failed to update ExecutorInfo for executor : "
- + executor, e);
- }
+ final List<Pair<Executor, Future<ExecutorInfo>>> futures =
+ new ArrayList<>();
+ for (final Executor executor : this.activeExecutors) {
+ // execute each executorInfo refresh task to fetch
+ final Future<ExecutorInfo> fetchExecutionInfo =
+ this.executorInforRefresherService.submit(
+ () -> this.apiGateway.callForJsonType(executor.getHost(),
+ executor.getPort(), "/serverStatistics", null, ExecutorInfo.class));
+ futures.add(new Pair<>(executor,
+ fetchExecutionInfo));
+ }
+
+ boolean wasSuccess = true;
+ for (final Pair<Executor, Future<ExecutorInfo>> refreshPair : futures) {
+ final Executor executor = refreshPair.getFirst();
+ executor.setExecutorInfo(null); // invalidate cached ExecutorInfo
+ try {
+ // max 5 secs
+ final ExecutorInfo executorInfo = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
+ // executorInfo is null if the response was empty
+ executor.setExecutorInfo(executorInfo);
+ logger.info(String.format(
+ "Successfully refreshed executor: %s with executor info : %s",
+ executor, executorInfo));
+ } catch (final TimeoutException e) {
+ wasSuccess = false;
+ logger.error("Timed out while waiting for ExecutorInfo refresh"
+ + executor, e);
+ } catch (final Exception e) {
+ wasSuccess = false;
+ logger.error("Failed to update ExecutorInfo for executor : "
+ + executor, e);
}
// update is successful for all executors