azkaban-aplcache

ExecutorManager refactor (#1804) 1. Simplify logic 2.

6/15/2018 6:07:47 PM

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index f575706..a85faf6 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -160,6 +160,9 @@ public class Constants {
     // The property is used for the web server to get the host name of the executor when running in SOLO mode.
     public static final String EXECUTOR_HOST = "executor.host";
 
+    // The property is used for the web server to get the port of the executor when running in SOLO mode.
+    public static final String EXECUTOR_PORT = "executor.port";
+
     // Max flow running time in mins, server will kill flows running longer than this setting.
     // if not set or <= 0, then there's no restriction on running time.
     public static final String AZKABAN_MAX_FLOW_RUNNING_MINS = "azkaban.server.flow.max.running.minutes";
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 3013c4b..dbd9581 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -35,6 +35,7 @@ import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import java.io.BufferedReader;
 import java.io.File;
@@ -92,7 +93,6 @@ public class ExecutorManager extends EventHandler implements
       * 24 * 60 * 60 * 1000L;
   private static final Duration RECENTLY_FINISHED_LIFETIME = Duration.ofMinutes(10);
   private static final Logger logger = Logger.getLogger(ExecutorManager.class);
-  final private Set<Executor> activeExecutors = new HashSet<>();
   private final AlerterHolder alerterHolder;
   private final Props azkProps;
   private final CommonMetrics commonMetrics;
@@ -105,6 +105,8 @@ public class ExecutorManager extends EventHandler implements
   private final int maxConcurrentRunsOneFlow;
   QueuedExecutions queuedFlows;
   File cacheDir;
+  //make it immutable to ensure threadsafety
+  private volatile ImmutableSet<Executor> activeExecutors = null;
   private QueueProcessorThread queueProcessor;
   private volatile Pair<ExecutionReference, ExecutableFlow> runningCandidate = null;
   private long lastCleanerThreadCheckTime = -1;
@@ -213,13 +215,13 @@ public class ExecutorManager extends EventHandler implements
     final Set<Executor> newExecutors = new HashSet<>();
 
     if (isMultiExecutorMode()) {
-      logger.info("Initializing multi executors from database");
+      logger.info("Initializing multi executors from database.");
       newExecutors.addAll(this.executorLoader.fetchActiveExecutors());
-    } else if (this.azkProps.containsKey("executor.port")) {
-      // Add local executor, if specified as per properties
+    } else if (this.azkProps.containsKey(ConfigurationKeys.EXECUTOR_PORT)) {
+      // add local executor, if specified as per properties
       final String executorHost = this.azkProps
           .getString(Constants.ConfigurationKeys.EXECUTOR_HOST, "localhost");
-      final int executorPort = this.azkProps.getInt("executor.port");
+      final int executorPort = this.azkProps.getInt(ConfigurationKeys.EXECUTOR_PORT);
       logger.info(String.format("Initializing local executor %s:%d",
           executorHost, executorPort));
       Executor executor =
@@ -232,19 +234,21 @@ public class ExecutorManager extends EventHandler implements
       }
       newExecutors.add(new Executor(executor.getId(), executorHost,
           executorPort, true));
+    } else {
+      // throw exception when in single executor mode and no executor port specified in azkaban
+      // properties
+      //todo chengren311: convert to slf4j and parameterized logging
+      final String error = "Missing" + ConfigurationKeys.EXECUTOR_PORT + " in azkaban properties.";
+      logger.error(error);
+      throw new ExecutorManagerException(error);
     }
 
     if (newExecutors.isEmpty()) {
-      logger.error("No active executor found");
-      throw new ExecutorManagerException("No active executor found");
-    } else if (newExecutors.size() > 1 && !isMultiExecutorMode()) {
-      logger.error("Multiple local executors specified");
-      throw new ExecutorManagerException("Multiple local executors specified");
+      final String error = "No active executor found";
+      logger.error(error);
+      throw new ExecutorManagerException(error);
     } else {
-      // clear all active executors, only if we have at least one new active
-      // executors
-      this.activeExecutors.clear();
-      this.activeExecutors.addAll(newExecutors);
+      this.activeExecutors = ImmutableSet.copyOf(newExecutors);
     }
   }
 
@@ -256,41 +260,39 @@ public class ExecutorManager extends EventHandler implements
    * Refresh Executor stats for all the actie executors in this executorManager
    */
   private void refreshExecutors() {
-    synchronized (this.activeExecutors) {
-
-      final List<Pair<Executor, Future<ExecutorInfo>>> futures =
-          new ArrayList<>();
-      for (final Executor executor : this.activeExecutors) {
-        // execute each executorInfo refresh task to fetch
-        final Future<ExecutorInfo> fetchExecutionInfo =
-            this.executorInforRefresherService.submit(
-                () -> this.apiGateway.callForJsonType(executor.getHost(),
-                    executor.getPort(), "/serverStatistics", null, ExecutorInfo.class));
-        futures.add(new Pair<>(executor,
-            fetchExecutionInfo));
-      }
 
-      boolean wasSuccess = true;
-      for (final Pair<Executor, Future<ExecutorInfo>> refreshPair : futures) {
-        final Executor executor = refreshPair.getFirst();
-        executor.setExecutorInfo(null); // invalidate cached ExecutorInfo
-        try {
-          // max 5 secs
-          final ExecutorInfo executorInfo = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
-          // executorInfo is null if the response was empty
-          executor.setExecutorInfo(executorInfo);
-          logger.info(String.format(
-              "Successfully refreshed executor: %s with executor info : %s",
-              executor, executorInfo));
-        } catch (final TimeoutException e) {
-          wasSuccess = false;
-          logger.error("Timed out while waiting for ExecutorInfo refresh"
-              + executor, e);
-        } catch (final Exception e) {
-          wasSuccess = false;
-          logger.error("Failed to update ExecutorInfo for executor : "
-              + executor, e);
-        }
+    final List<Pair<Executor, Future<ExecutorInfo>>> futures =
+        new ArrayList<>();
+    for (final Executor executor : this.activeExecutors) {
+      // execute each executorInfo refresh task to fetch
+      final Future<ExecutorInfo> fetchExecutionInfo =
+          this.executorInforRefresherService.submit(
+              () -> this.apiGateway.callForJsonType(executor.getHost(),
+                  executor.getPort(), "/serverStatistics", null, ExecutorInfo.class));
+      futures.add(new Pair<>(executor,
+          fetchExecutionInfo));
+    }
+
+    boolean wasSuccess = true;
+    for (final Pair<Executor, Future<ExecutorInfo>> refreshPair : futures) {
+      final Executor executor = refreshPair.getFirst();
+      executor.setExecutorInfo(null); // invalidate cached ExecutorInfo
+      try {
+        // max 5 secs
+        final ExecutorInfo executorInfo = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
+        // executorInfo is null if the response was empty
+        executor.setExecutorInfo(executorInfo);
+        logger.info(String.format(
+            "Successfully refreshed executor: %s with executor info : %s",
+            executor, executorInfo));
+      } catch (final TimeoutException e) {
+        wasSuccess = false;
+        logger.error("Timed out while waiting for ExecutorInfo refresh"
+            + executor, e);
+      } catch (final Exception e) {
+        wasSuccess = false;
+        logger.error("Failed to update ExecutorInfo for executor : "
+            + executor, e);
       }
 
       // update is successful for all executors