azkaban-developers

incorporating feedback for pr #487;using moving executor

9/14/2015 6:42:38 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ActiveExecutorsManager.java b/azkaban-common/src/main/java/azkaban/executor/ActiveExecutorsManager.java
new file mode 100644
index 0000000..c904ff0
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ActiveExecutorsManager.java
@@ -0,0 +1,9 @@
+package azkaban.executor;
+
+public class ActiveExecutorsManager {
+
+  public ActiveExecutorsManager() {
+    // TODO Auto-generated constructor stub
+  }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
index e21ae2a..f0600ab 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Executor.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -92,9 +92,9 @@ public class Executor implements Comparable<Executor> {
 
   @Override
   public String toString(){
-    return String.format("%s (id: %s)",
-        null == this.host || this.host.length() == 0 ? "(empty)" : this.host,
-        this.id);
+    return String.format("%s:%s (id: %s)",
+      null == this.host || this.host.length() == 0 ? "(empty)" : this.host,
+      this.port, this.id);
   }
 
   public String getHost() {
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index bf87497..7fbae66 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -30,10 +30,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
@@ -43,6 +47,8 @@ import azkaban.alert.Alerter;
 import azkaban.event.Event;
 import azkaban.event.Event.Type;
 import azkaban.event.EventHandler;
+import azkaban.executor.selector.ExecutorComparator;
+import azkaban.executor.selector.ExecutorFilter;
 import azkaban.executor.selector.ExecutorSelector;
 import azkaban.project.Project;
 import azkaban.project.ProjectWhitelist;
@@ -71,8 +77,10 @@ public class ExecutorManager extends EventHandler implements
     "azkaban.webserver.queue.size";
   private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS =
     "azkaban.activeexecutor.refresh.milisecinterval";
-  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_FLOW =
+  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW =
     "azkaban.activeexecutor.refresh.flowinterval";
+  private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS =
+      "azkaban.executorinfo.refresh.maxThreads";
 
   private static Logger logger = Logger.getLogger(ExecutorManager.class);
   private ExecutorLoader executorLoader;
@@ -102,9 +110,11 @@ public class ExecutorManager extends EventHandler implements
 
   File cacheDir;
 
-  final Props azkProps;
-  List<String> filterList;
-  Map<String, Integer> comparatorWeightsMap;
+  private final Props azkProps;
+  private List<String> filterList;
+  private Map<String, Integer> comparatorWeightsMap;
+  private long lastSuccessfulExecutorInfoRefresh;
+  private ExecutorService executorInforRefresherService;
 
   public ExecutorManager(Props props, ExecutorLoader loader,
       Map<String, Alerter> alters) throws ExecutorManagerException {
@@ -155,12 +165,16 @@ public class ExecutorManager extends EventHandler implements
       }
     }
 
+    executorInforRefresherService =
+        Executors.newFixedThreadPool(azkProps.getInt(
+          AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS, 5));
+
     // configure queue processor
     queueProcessor =
       new QueueProcessorThread(azkProps.getBoolean(
         AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
         AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS, 1000), azkProps.getInt(
-        AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_FLOW, 1000));
+        AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW, 1000));
     queueProcessor.start();
   }
 
@@ -195,8 +209,10 @@ public class ExecutorManager extends EventHandler implements
     }
 
     if (newExecutors.isEmpty()) {
+      logger.error("No active executor found");
       throw new ExecutorManagerException("No active executor found");
     } else if(newExecutors.size() > 1 && !isMultiExecutorMode()) {
+      logger.error("Multiple local executors specified");
       throw new ExecutorManagerException("Multiple local executors specified");
     } else {
       // clear all active executors, only if we have at least one new active
@@ -215,34 +231,46 @@ public class ExecutorManager extends EventHandler implements
    */
   private void refreshExecutors() {
     synchronized (activeExecutors) {
-      ExecutorService taskExecutors =
-        Executors.newFixedThreadPool(activeExecutors.size());
-      for (final Executor executor : activeExecutors) {
 
-        // execute each executorInfo refresh task
-        taskExecutors.execute(new Runnable() {
-          @Override
-          public void run() {
-            try {
-              String jsonResponse =
-                callExecutorForJsonString(executor.getHost(),
-                  executor.getPort(), "/serverstastics", null);
-              executor.setExecutorInfo(ExecutorInfo
-                .fromJSONString(jsonResponse));
-            } catch (Exception e) {
-              logger.error("Failed to update ExecutorInfo executorId :"
-                + executor, e);
+      List<Pair<Executor, Future<String>>> futures =
+        new ArrayList<Pair<Executor, Future<String>>>();
+      for (final Executor executor : activeExecutors) {
+        // execute each executorInfo refresh task to fetch
+        Future<String> fetchExecutionInfo =
+          executorInforRefresherService.submit(new Callable<String>() {
+            @Override
+            public String call() throws Exception {
+              return callExecutorForJsonString(executor.getHost(),
+                executor.getPort(), "/serverstastics", null);
             }
-          }
-        });
+          });
+        futures.add(new Pair<Executor, Future<String>>(executor,
+          fetchExecutionInfo));
+      }
+
+      boolean wasSuccess = true;
+      for (Pair<Executor, Future<String>> refreshPair : futures) {
+        Executor executor = refreshPair.getFirst();
+        try {
+          // max 5 secs
+          String jsonString = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
+          executor.setExecutorInfo(ExecutorInfo.fromJSONString(jsonString));
+          logger.info("Successfully refreshed ExecutorInfo for executor: "
+            + executor);
+        } catch (TimeoutException e) {
+          wasSuccess = false;
+          logger.error("Timed out while waiting for ExecutorInfo refresh"
+            + executor, e);
+        } catch (Exception e) {
+          wasSuccess = false;
+          logger.error("Failed to update ExecutorInfo for executor : "
+            + executor, e);
+        }
       }
 
-      taskExecutors.shutdown();
-      try {
-        // wait 5 seconds for all executors to be refreshed
-        taskExecutors.awaitTermination(5, TimeUnit.SECONDS);
-      } catch (InterruptedException e) {
-        logger.error("Timed out while waiting for executorInfo refresh", e);
+      // update is successful for all executors
+      if (wasSuccess) {
+        lastSuccessfulExecutorInfoRefresh = System.currentTimeMillis();
       }
     }
   }
@@ -297,6 +325,34 @@ public class ExecutorManager extends EventHandler implements
       return false;
   }
 
+  /**
+   * Return last Successful ExecutorInfo Refresh for all active executors
+   *
+   * @return
+   */
+  public long getLastSuccessfulExecutorInfoRefresh() {
+    return this.lastSuccessfulExecutorInfoRefresh;
+  }
+
+  /**
+   * Get currently supported Comparators available to use via azkaban.properties
+   *
+   * @return
+   */
+  public Set<String> getAvailableExecutorComparatorNames() {
+    return ExecutorComparator.getAvailableComparatorNames();
+
+  }
+
+  /**
+   * Get currently supported filters available to use via azkaban.properties
+   *
+   * @return
+   */
+  public Set<String> getAvailableExecutorFilterNames() {
+    return ExecutorFilter.getAvailableFilterNames();
+  }
+
   @Override
   public State getExecutorManagerThreadState() {
     return executingManager.getState();
@@ -1143,7 +1199,10 @@ public class ExecutorManager extends EventHandler implements
     List<Pair<String, String>> paramList =
       new ArrayList<Pair<String, String>>();
 
-    paramList.add(new Pair<String, String>(ConnectorParams.JMX_MBEAN, mBean));
+    paramList.add(new Pair<String, String>(action, ""));
+    if(mBean != null) {
+      paramList.add(new Pair<String, String>(ConnectorParams.JMX_MBEAN, mBean));
+    }
 
     String[] hostPortSplit = hostPort.split(":");
     return callExecutorForJsonObject(hostPortSplit[0],
@@ -1777,9 +1836,9 @@ public class ExecutorManager extends EventHandler implements
     }
 
     /* Helper method to fetch  overriding Executor, if a valid user has specifed otherwise return null */
-    private Executor getUserSpecifiedExecutor(ExecutableFlow exflow) {
+    private Executor getUserSpecifiedExecutor(ExecutionOptions options,
+      int executionId) {
       Executor executor = null;
-      ExecutionOptions options = exflow.getExecutionOptions();
       if (options != null
         && options.getFlowParameters() != null
         && options.getFlowParameters().containsKey(
@@ -1795,13 +1854,19 @@ public class ExecutorManager extends EventHandler implements
               .warn(String
                 .format(
                   "User specified executor id: %d for execution id: %d is not active, Looking up db.",
-                  executorId, exflow.getExecutionId()));
+                  executorId, executionId));
             executor = executorLoader.fetchExecutor(executorId);
+            if (executor == null) {
+              logger
+                .warn(String
+                  .format(
+                    "User specified executor id: %d for execution id: %d is missing from db. Defaulting to availableExecutors",
+                    executorId, executionId));
+            }
           }
-
-        } catch (Exception ex) {
+        } catch (ExecutorManagerException ex) {
           logger.error("Failed to fetch user specified executor for exec_id = "
-            + exflow.getExecutionId(), ex);
+            + executionId, ex);
         }
       }
       return executor;
@@ -1810,7 +1875,9 @@ public class ExecutorManager extends EventHandler implements
     /* Choose Executor for exflow among the available executors */
     private Executor selectExecutor(ExecutableFlow exflow,
       Set<Executor> availableExecutors) {
-      Executor choosenExecutor = getUserSpecifiedExecutor(exflow);
+      Executor choosenExecutor =
+        getUserSpecifiedExecutor(exflow.getExecutionOptions(),
+          exflow.getExecutionId());
 
       // If no executor was specified by admin
       if (choosenExecutor == null) {
diff --git a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
index bbd642a..d459ae9 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
@@ -78,4 +78,19 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
     return manager.getQueueProcessorThreadState().toString();
   }
 
+  @Override
+  public List<String> getAvailableExecutorComparatorNames() {
+    return new ArrayList<String>(manager.getAvailableExecutorComparatorNames());
+  }
+
+  @Override
+  public List<String> getAvailableExecutorFilterNames() {
+    return new ArrayList<String>(manager.getAvailableExecutorFilterNames());
+  }
+
+  @Override
+  public long getLastSuccessfulExecutorInfoRefresh() {
+    return manager.getLastSuccessfulExecutorInfoRefresh();
+  }
+
 }
diff --git a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
index 94012e0..69e401c 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
@@ -49,4 +49,13 @@ public interface JmxExecutorManagerMBean {
   @DisplayName("OPERATION: getQueueProcessorThreadState")
   public String getQueueProcessorThreadState();
 
+  @DisplayName("OPERATION: getAvailableExecutorComparatorNames")
+  List<String> getAvailableExecutorComparatorNames();
+
+  @DisplayName("OPERATION: getAvailableExecutorFilterNames")
+  List<String> getAvailableExecutorFilterNames();
+
+  @DisplayName("OPERATION: getLastSuccessfulExecutorInfoRefresh")
+  long getLastSuccessfulExecutorInfoRefresh();
+
 }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 235989d..8f5df1a 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Constructor;
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.TimeZone;
@@ -501,4 +502,18 @@ public class AzkabanExecutorServer {
       return null;
     }
   }
+
+  /**
+   * Returns host:port combination for currently running executor
+   * @return
+   */
+  public String getExecutorHostPort() {
+    String host = "unkownHost";
+    try {
+      host = InetAddress.getLocalHost().getCanonicalHostName();
+    } catch (Exception e) {
+      logger.error("Failed to fetch LocalHostName");
+    }
+    return host + ":" + props.getInt("executor.port", DEFAULT_PORT_NUMBER);
+  }
 }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
index da98b99..7eedee4 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
@@ -271,6 +271,7 @@ public class FlowRunner extends EventHandler implements Runnable {
       this.watcher.setLogger(logger);
     }
 
+    logger.info("Assigned executor : " + AzkabanExecutorServer.getApp().getExecutorHostPort());
     logger.info("Running execid:" + execId + " flow:" + flowId + " project:"
         + projectId + " version:" + version);
     if (pipelineExecId != null) {
@@ -840,7 +841,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 
   /**
    * Configure Azkaban metrics tracking for a new jobRunner instance
-   * 
+   *
    * @param jobRunner
    */
   private void configureJobLevelMetrics(JobRunner jobRunner) {
diff --git a/azkaban-webserver/.gitignore b/azkaban-webserver/.gitignore
new file mode 100644
index 0000000..5e56e04
--- /dev/null
+++ b/azkaban-webserver/.gitignore
@@ -0,0 +1 @@
+/bin
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 4a693a9..4970fce 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -166,16 +166,18 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
   }
 
   /**
-   * Enables queueProcessor if @param status is true and disables queueProcessor
-   * is @param status is false.
+   * <pre>
+   * Enables queueProcessor if @param status is true
+   * disables queueProcessor if @param status is false.
+   * </pre>
    */
   private void ajaxUpdateQueueProcessor(HttpServletRequest req,
     HttpServletResponse resp, HashMap<String, Object> returnMap, User user,
-    boolean status) {
+    boolean enableQueue) {
     boolean wasSuccess = false;
     if (HttpRequestUtils.hasPermission(userManager, user, Type.ADMIN)) {
       try {
-        if (status) {
+        if (enableQueue) {
           executorManager.enableQueueProcessorThread();
         } else {
           executorManager.disableQueueProcessorThread();
diff --git a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
index d77bc52..27be342 100644
--- a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
+++ b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
@@ -41,7 +41,7 @@
                  } else {
                     $('#metricName').empty();
                     for(var index = 0; index < responseData.metricList.length; index++) {
-                      $('#metricName').append($('<option value="1">' + responseData.metricList[index] + '</option>'));
+                      $('#metricName').append($('<option value="' + responseData.metricList[index] +'">' + responseData.metricList[index] + '</option>'));
                     }
                   }
                };