azkaban-aplcache

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
index 0897555..f47fe7b 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
@@ -56,14 +56,6 @@ public class ExecutionReference {
     return execId;
   }
 
-  public String getHost() {
-    return executor.getHost();
-  }
-
-  public int getPort() {
-    return executor.getPort();
-  }
-
   public int getNumErrors() {
     return numErrors;
   }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 3740227..29d99ec 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -22,6 +22,7 @@ import java.lang.Thread.State;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -61,24 +62,26 @@ import azkaban.utils.Props;
 public class ExecutorManager extends EventHandler implements
   ExecutorManagerAdapter {
   private static Logger logger = Logger.getLogger(ExecutorManager.class);
-  private ExecutorLoader executorLoader;
+  final private ExecutorLoader executorLoader;
 
-  private CleanerThread cleanerThread;
+  final private CleanerThread cleanerThread;
 
-  private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
+  final private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
     new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
-  private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
+  final private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
     new ConcurrentHashMap<Integer, ExecutableFlow>();
-  /* all flows ExecutorManager is currently dealing with */
-  private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> queuedFlowMap =
+
+  /* map to easily access queued flows */
+  final private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> queuedFlowMap =
     new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
-  private ConcurrentLinkedQueue<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+  /* web server side queue */
+  final private ConcurrentLinkedQueue<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
     new ConcurrentLinkedQueue<Pair<ExecutionReference, ExecutableFlow>>();
 
-  private Set<Executor> activeExecutors = new HashSet<Executor>();
+  final private Set<Executor> activeExecutors = new HashSet<Executor>();
 
-  private ExecutingManagerUpdaterThread executingManager;
-  private QueueProcessorThread queueProcessor;
+  final private ExecutingManagerUpdaterThread executingManager;
+  final private QueueProcessorThread queueProcessor;
 
   private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
     * 24 * 60 * 60 * 1000l;
@@ -88,14 +91,18 @@ public class ExecutorManager extends EventHandler implements
   private long lastThreadCheckTime = -1;
   private String updaterStage = "not started";
 
-  private Map<String, Alerter> alerters;
+  final private Map<String, Alerter> alerters;
+
+  final Props azkProps;
 
   File cacheDir;
 
   public ExecutorManager(Props props, ExecutorLoader loader,
     Map<String, Alerter> alters) throws ExecutorManagerException {
+    azkProps = props;
+
     this.executorLoader = loader;
-    this.setupExecutors(props);
+    this.setupExecutors();
     this.loadRunningFlows();
     this.loadQueuedFlows();
 
@@ -114,15 +121,31 @@ public class ExecutorManager extends EventHandler implements
         DEFAULT_EXECUTION_LOGS_RETENTION_MS);
     cleanerThread = new CleanerThread(executionLogsRetentionMs);
     cleanerThread.start();
-
+    disableQueueProcessorThread();
   }
 
-  /* setup activeExecutors using azkaban.properties and database executors */
-  private void setupExecutors(Props props) throws ExecutorManagerException {
+  /**
+   * <pre>
+   * Setup activeExecutors using azkaban.properties and database executors
+   * Note:
+   * 1. If a local executor is specified and it is missing from db,
+   *    this method add local executor as active in DB
+   * 2. If a local executor is specified and it is marked inactive in db,
+   *    this method will convert local executor as active in DB
+   * 3. If azkaban.use.multiple.executors is set true, this method will
+   *    load all active projects
+   * </pre>
+   *
+   * @throws ExecutorManagerException
+   */
+  public void setupExecutors() throws ExecutorManagerException {
+    // clear all active executors
+    activeExecutors.clear();
+
     // Add local executor, if specified as per properties
-    if (props.containsKey("executor.port")) {
-      String executorHost = props.getString("executor.host", "localhost");
-      int executorPort = props.getInt("executor.port");
+    if (azkProps.containsKey("executor.port")) {
+      String executorHost = azkProps.getString("executor.host", "localhost");
+      int executorPort = azkProps.getInt("executor.port");
       Executor executor =
         executorLoader.fetchExecutor(executorHost, executorPort);
       if (executor == null) {
@@ -135,7 +158,7 @@ public class ExecutorManager extends EventHandler implements
         executorPort, true));
     }
 
-    if (props.getBoolean("azkaban.multiple.executors", false)) {
+    if (azkProps.getBoolean("azkaban.use.multiple.executors", false)) {
       activeExecutors.addAll(executorLoader.fetchActiveExecutors());
     }
 
@@ -144,6 +167,22 @@ public class ExecutorManager extends EventHandler implements
     }
   }
 
+  public void disableQueueProcessorThread() {
+    queueProcessor.setActive(false);
+  }
+
+  public void enableQueueProcessorThread() {
+    queueProcessor.setActive(true);
+  }
+
+  public State getQueueProcessorThreadState() {
+    return queueProcessor.getState();
+  }
+
+  public boolean isQueueProcessorThreadActive() {
+    return queueProcessor.isActive();
+  }
+
   @Override
   public State getExecutorManagerThreadState() {
     return executingManager.getState();
@@ -172,6 +211,12 @@ public class ExecutorManager extends EventHandler implements
     return activeExecutors;
   }
 
+  /**
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#fetchExecutor(int)
+   */
   @Override
   public Executor fetchExecutor(int executorId) throws ExecutorManagerException {
     for (Executor executor : activeExecutors) {
@@ -235,24 +280,29 @@ public class ExecutorManager extends EventHandler implements
   @Override
   public List<Integer> getRunningFlows(int projectId, String flowId) {
     ArrayList<Integer> executionIds = new ArrayList<Integer>();
-    for (Pair<ExecutionReference, ExecutableFlow> ref : queuedFlowMap.values()) {
-      if (ref.getSecond().getFlowId().equals(flowId)
-        && ref.getSecond().getProjectId() == projectId) {
-        executionIds.add(ref.getFirst().getExecId());
-      }
-    }
-    for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+    getRunningFlowsHelper(projectId, flowId, executionIds,
+      queuedFlowMap.values());
+    getRunningFlowsHelper(projectId, flowId, executionIds,
+      runningFlows.values());
+    return executionIds;
+  }
+
+  /* Helper method for getRunningFlows */
+  private void getRunningFlowsHelper(int projectId, String flowId,
+    ArrayList<Integer> executionIds,
+    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
       if (ref.getSecond().getFlowId().equals(flowId)
         && ref.getSecond().getProjectId() == projectId) {
         executionIds.add(ref.getFirst().getExecId());
       }
     }
-    return executionIds;
   }
 
   /**
    *
    * {@inheritDoc}
+   *
    * @see azkaban.executor.ExecutorManagerAdapter#getActiveFlowsWithExecutor()
    */
   @Override
@@ -260,15 +310,19 @@ public class ExecutorManager extends EventHandler implements
     throws IOException {
     List<Pair<ExecutableFlow, Executor>> flows =
       new ArrayList<Pair<ExecutableFlow, Executor>>();
-    for (Pair<ExecutionReference, ExecutableFlow> ref : queuedFlowMap.values()) {
-      flows.add(new Pair<ExecutableFlow, Executor>(ref.getSecond(), ref
-        .getFirst().getExecutor()));
-    }
-    for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+    getActiveFlowsWithExecutorHelper(flows, queuedFlowMap.values());
+    getActiveFlowsWithExecutorHelper(flows, runningFlows.values());
+    return flows;
+  }
+
+  /* Helper method for getActiveFlowsWithExecutor */
+  private void getActiveFlowsWithExecutorHelper(
+    List<Pair<ExecutableFlow, Executor>> flows,
+    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
       flows.add(new Pair<ExecutableFlow, Executor>(ref.getSecond(), ref
         .getFirst().getExecutor()));
     }
-    return flows;
   }
 
   /**
@@ -280,13 +334,20 @@ public class ExecutorManager extends EventHandler implements
    */
   @Override
   public boolean isFlowRunning(int projectId, String flowId) {
-    for (Pair<ExecutionReference, ExecutableFlow> ref : queuedFlowMap.values()) {
-      if (ref.getSecond().getProjectId() == projectId
-        && ref.getSecond().getFlowId().equals(flowId)) {
-        return true;
-      }
-    }
-    for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+    boolean isRunning = false;
+    isRunning =
+      isRunning
+        || isFlowRunningHelper(projectId, flowId, queuedFlowMap.values());
+    isRunning =
+      isRunning
+        || isFlowRunningHelper(projectId, flowId, runningFlows.values());
+    return false;
+  }
+
+  /* Search a running flow in a collection */
+  private boolean isFlowRunningHelper(int projectId, String flowId,
+    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
       if (ref.getSecond().getProjectId() == projectId
         && ref.getSecond().getFlowId().equals(flowId)) {
         return true;
@@ -323,13 +384,20 @@ public class ExecutorManager extends EventHandler implements
   @Override
   public List<ExecutableFlow> getRunningFlows() {
     ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
-    for (Pair<ExecutionReference, ExecutableFlow> ref : queuedFlowMap.values()) {
-      flows.add(ref.getSecond());
-    }
-    for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+    getActiveFlowHelper(flows, queuedFlowMap.values());
+    getActiveFlowHelper(flows, runningFlows.values());
+    return flows;
+  }
+
+  /*
+   * Helper method to get all running flows from a Pair<ExecutionReference,
+   * ExecutableFlow collection
+   */
+  private void getActiveFlowHelper(ArrayList<ExecutableFlow> flows,
+    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
       flows.add(ref.getSecond());
     }
-    return flows;
   }
 
   /**
@@ -341,16 +409,34 @@ public class ExecutorManager extends EventHandler implements
    */
   public String getRunningFlowIds() {
     List<Integer> allIds = new ArrayList<Integer>();
-    for (Pair<ExecutionReference, ExecutableFlow> ref : queuedFlowMap.values()) {
-      allIds.add(ref.getSecond().getExecutionId());
-    }
-    for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
-      allIds.add(ref.getSecond().getExecutionId());
-    }
+    getRunningFlowsIdsHelper(allIds, queuedFlowMap.values());
+    getRunningFlowsIdsHelper(allIds, runningFlows.values());
+    Collections.sort(allIds);
+    return allIds.toString();
+  }
+
+  /**
+   * Get execution Ids of all non-dispatched flows
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
+   */
+  public String getQueuedFlowIds() {
+    List<Integer> allIds = new ArrayList<Integer>();
+    getRunningFlowsIdsHelper(allIds, queuedFlowMap.values());
     Collections.sort(allIds);
     return allIds.toString();
   }
 
+  /* Helper method to flow ids of all running flows */
+  private void getRunningFlowsIdsHelper(List<Integer> allIds,
+    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+      allIds.add(ref.getSecond().getExecutionId());
+    }
+  }
+
   /**
    * Get recently finished flows {@inheritDoc}
    *
@@ -791,7 +877,8 @@ public class ExecutorManager extends EventHandler implements
   private Map<String, Object> callExecutorServer(ExecutionReference ref,
     String action) throws ExecutorManagerException {
     try {
-      return callExecutorServer(ref.getHost(), ref.getPort(), action,
+      Executor executor = ref.getExecutor();
+      return callExecutorServer(executor.getHost(), executor.getPort(), action,
         ref.getExecId(), null, (Pair<String, String>[]) null);
     } catch (IOException e) {
       throw new ExecutorManagerException(e);
@@ -801,7 +888,8 @@ public class ExecutorManager extends EventHandler implements
   private Map<String, Object> callExecutorServer(ExecutionReference ref,
     String action, String user) throws ExecutorManagerException {
     try {
-      return callExecutorServer(ref.getHost(), ref.getPort(), action,
+      Executor executor = ref.getExecutor();
+      return callExecutorServer(executor.getHost(), executor.getPort(), action,
         ref.getExecId(), user, (Pair<String, String>[]) null);
     } catch (IOException e) {
       throw new ExecutorManagerException(e);
@@ -812,7 +900,8 @@ public class ExecutorManager extends EventHandler implements
     String action, Pair<String, String>... params)
     throws ExecutorManagerException {
     try {
-      return callExecutorServer(ref.getHost(), ref.getPort(), action,
+      Executor executor = ref.getExecutor();
+      return callExecutorServer(executor.getHost(), executor.getPort(), action,
         ref.getExecId(), null, params);
     } catch (IOException e) {
       throw new ExecutorManagerException(e);
@@ -823,7 +912,8 @@ public class ExecutorManager extends EventHandler implements
     String action, String user, Pair<String, String>... params)
     throws ExecutorManagerException {
     try {
-      return callExecutorServer(ref.getHost(), ref.getPort(), action,
+      Executor executor = ref.getExecutor();
+      return callExecutorServer(executor.getHost(), executor.getPort(), action,
         ref.getExecId(), user, params);
     } catch (IOException e) {
       throw new ExecutorManagerException(e);
@@ -887,6 +977,7 @@ public class ExecutorManager extends EventHandler implements
   /**
    * Manage servlet call for stats servlet in Azkaban execution server
    * {@inheritDoc}
+   *
    * @throws ExecutorManagerException
    *
    * @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String,
@@ -894,11 +985,13 @@ public class ExecutorManager extends EventHandler implements
    */
   @Override
   public Map<String, Object> callExecutorStats(int executorId, String action,
-    Pair<String, String>... params) throws IOException, ExecutorManagerException {
+    Pair<String, String>... params) throws IOException,
+    ExecutorManagerException {
 
     URIBuilder builder = new URIBuilder();
     Executor executor = fetchExecutor(executorId);
-    builder.setScheme("http").setHost(executor.getHost()).setPort(executor.getPort()).setPath("/stats");
+    builder.setScheme("http").setHost(executor.getHost())
+      .setPort(executor.getPort()).setPath("/stats");
 
     builder.setParameter(ConnectorParams.ACTION_PARAM, action);
 
@@ -1443,11 +1536,20 @@ public class ExecutorManager extends EventHandler implements
     private boolean shutdown = false;
     private long lastProcessingTime = -1;
     private long maxContinousSubmission = 10;
+    private boolean isActive = true;
 
     public QueueProcessorThread() {
       this.setName("AzkabanWebServer-QueueProcessor-Thread");
     }
 
+    public void setActive(boolean isActive) {
+      this.isActive = isActive;
+    }
+
+    public boolean isActive() {
+      return isActive;
+    }
+
     @SuppressWarnings("unused")
     public void shutdown() {
       shutdown = true;
@@ -1458,10 +1560,13 @@ public class ExecutorManager extends EventHandler implements
       while (!shutdown) {
         synchronized (this) {
           try {
-            refreshExecutors();
-            lastCleanerThreadCheckTime = System.currentTimeMillis();
             long currentTime = System.currentTimeMillis();
-            if (currentTime - QUEUE_PROCESSOR_WAIT_IN_MS > lastProcessingTime) {
+            if (isActive
+              && currentTime - QUEUE_PROCESSOR_WAIT_IN_MS > lastProcessingTime) {
+              // Refresh executor stats to be used by selector
+              refreshExecutors();
+              // process upto a maximum of maxContinousSubmission from queued
+              // flows
               processQueuedFlows(maxContinousSubmission);
               lastProcessingTime = currentTime;
             }
diff --git a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
index 08e5534..287e971 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
@@ -62,4 +62,20 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
   public String getRunningFlows() {
     return manager.getRunningFlowIds();
   }
+
+  @Override
+  public boolean isQueueProcessorActive() {
+    return manager.isQueueProcessorThreadActive();
+  }
+
+  @Override
+  public String getUndispatchedFlows() {
+    return manager.getQueuedFlowIds();
+  }
+
+  @Override
+  public String getQueueProcessorThreadState() {
+    return manager.getQueueProcessorThreadState().toString();
+  }
+
 }
diff --git a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
index 9bc1175..babb2a0 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
@@ -39,4 +39,14 @@ public interface JmxExecutorManagerMBean {
 
   @DisplayName("OPERATION: getPrimaryExecutorHostPorts")
   public List<String> getPrimaryExecutorHostPorts();
+
+  @DisplayName("OPERATION: isQueueProcessorActive")
+  public boolean isQueueProcessorActive();
+
+  @DisplayName("OPERATION: getUndispatchedFlows")
+  public String getUndispatchedFlows();
+
+  @DisplayName("OPERATION: getQueueProcessorThreadState")
+  public String getQueueProcessorThreadState();
+
 }
diff --git a/azkaban-webserver/src/web/js/azkaban/view/exflow.js b/azkaban-webserver/src/web/js/azkaban/view/exflow.js
index dbb8ae4..026a946 100644
--- a/azkaban-webserver/src/web/js/azkaban/view/exflow.js
+++ b/azkaban-webserver/src/web/js/azkaban/view/exflow.js
@@ -178,8 +178,11 @@ azkaban.FlowTabView = Backbone.View.extend({
 		$("#retrybtn").hide();
 
 		if (data.status == "SUCCEEDED") {
-      $("#executebtn").show();
+                        $("#executebtn").show();
 		}
+                else if (data.status == "PREPARING") {
+                        $("#cancelbtn").show();
+                }
 		else if (data.status == "FAILED") {
 			$("#executebtn").show();
 		}