azkaban-aplcache

AZNewDispatchingLogic - Switch fetching running flows from

12/11/2018 10:48:02 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
index fee1499..55da336 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
@@ -25,13 +25,14 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import javax.inject.Inject;
 import javax.inject.Singleton;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -89,8 +90,13 @@ public class ExecutionController extends EventHandler implements ExecutorManager
 
   @Override
   public Collection<Executor> getAllActiveExecutors() {
-    // Todo: get the executor info from DB
-    return Collections.emptyList();
+    List<Executor> executors = new ArrayList<>();
+    try {
+      executors = this.executorLoader.fetchActiveExecutors();
+    } catch (final ExecutorManagerException e) {
+      this.logger.error("Failed to get all active executors.", e);
+    }
+    return executors;
   }
 
   @Override
@@ -100,31 +106,88 @@ public class ExecutionController extends EventHandler implements ExecutorManager
 
   @Override
   public Set<String> getPrimaryServerHosts() {
-    // Todo: get the primary executor host info from DB
-    return Collections.emptySet();
+    final HashSet<String> ports = new HashSet<>();
+    try {
+      for (final Executor executor : this.executorLoader.fetchActiveExecutors()) {
+        ports.add(executor.getHost() + ":" + executor.getPort());
+      }
+    } catch (final ExecutorManagerException e) {
+      this.logger.error("Failed to get primary server hosts.", e);
+    }
+    return ports;
   }
 
   @Override
   public Set<String> getAllActiveExecutorServerHosts() {
-    // Todo: get all executor host info from DB
-    return Collections.emptySet();
+    final Set<String> ports = getPrimaryServerHosts();
+    // include executor which were initially active and still has flows running
+    try {
+      for (final Pair<ExecutionReference, ExecutableFlow> running : this.executorLoader
+          .fetchActiveFlows().values()) {
+        final ExecutionReference ref = running.getFirst();
+        if (ref.getExecutor().isPresent()) {
+          final Executor executor = ref.getExecutor().get();
+          ports.add(executor.getHost() + ":" + executor.getPort());
+        }
+      }
+    } catch (final ExecutorManagerException e) {
+      this.logger.error("Failed to get all active executor server hosts.", e);
+    }
+    return ports;
   }
 
   /**
-   * Gets a list of all the active (running flows and non-dispatched flows) executions for a given
-   * project and flow from database. {@inheritDoc}
+   * Gets a list of all the unfinished (both dispatched and non-dispatched) executions for a
+   * given project and flow {@inheritDoc}.
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows(int, java.lang.String)
    */
   @Override
   public List<Integer> getRunningFlows(final int projectId, final String flowId) {
-    // Todo: get running flows from DB
-    return Collections.emptyList();
+    final List<Integer> executionIds = new ArrayList<>();
+    try {
+      executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
+          this.executorLoader.fetchUnfinishedFlows().values()));
+    } catch (final ExecutorManagerException e) {
+      this.logger.error("Failed to get running flows for project " + projectId + ", flow "
+          + flowId, e);
+    }
+    return executionIds;
+  }
+
+  /* Helper method for getRunningFlows */
+  private List<Integer> getRunningFlowsHelper(final int projectId, final String flowId,
+      final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    final List<Integer> executionIds = new ArrayList<>();
+    for (final Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+      if (ref.getSecond().getFlowId().equals(flowId)
+          && ref.getSecond().getProjectId() == projectId) {
+        executionIds.add(ref.getFirst().getExecId());
+      }
+    }
+    return executionIds;
   }
 
   @Override
   public List<Pair<ExecutableFlow, Optional<Executor>>> getActiveFlowsWithExecutor()
       throws IOException {
-    // Todo: get active flows with executor from DB
-    return Collections.emptyList();
+    final List<Pair<ExecutableFlow, Optional<Executor>>> flows = new ArrayList<>();
+    try {
+      getActiveFlowsWithExecutorHelper(flows, this.executorLoader.fetchUnfinishedFlows().values());
+    } catch (final ExecutorManagerException e) {
+      this.logger.error("Failed to get active flows with executor.", e);
+    }
+    return flows;
+  }
+
+  /* Helper method for getActiveFlowsWithExecutor */
+  private void getActiveFlowsWithExecutorHelper(
+      final List<Pair<ExecutableFlow, Optional<Executor>>> flows,
+      final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    for (final Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+      flows.add(new Pair<>(ref.getSecond(), ref
+          .getFirst().getExecutor()));
+    }
   }
 
   /**
@@ -133,8 +196,29 @@ public class ExecutionController extends EventHandler implements ExecutorManager
    */
   @Override
   public boolean isFlowRunning(final int projectId, final String flowId) {
-    // Todo: check DB to see if flow is running
-    return true;
+    boolean isRunning = false;
+    try {
+      isRunning = isFlowRunningHelper(projectId, flowId,
+          this.executorLoader.fetchUnfinishedFlows().values());
+
+    } catch (final ExecutorManagerException e) {
+      this.logger.error(
+          "Failed to check if the flow is running for project " + projectId + ", flow " + flowId,
+          e);
+    }
+    return isRunning;
+  }
+
+  /* Search a running flow in a collection */
+  private boolean isFlowRunningHelper(final int projectId, final String flowId,
+      final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    for (final Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+      if (ref.getSecond().getProjectId() == projectId
+          && ref.getSecond().getFlowId().equals(flowId)) {
+        return true;
+      }
+    }
+    return false;
   }
 
   /**
@@ -151,8 +235,24 @@ public class ExecutionController extends EventHandler implements ExecutorManager
    */
   @Override
   public List<ExecutableFlow> getRunningFlows() {
-    // Todo: get running flows from DB
-    return Collections.emptyList();
+    final ArrayList<ExecutableFlow> flows = new ArrayList<>();
+    try {
+      getActiveFlowHelper(flows, this.executorLoader.fetchUnfinishedFlows().values());
+    } catch (final ExecutorManagerException e) {
+      this.logger.error("Failed to get running flows.", e);
+    }
+    return flows;
+  }
+
+  /**
+   * Helper method to get all running flows from a Pair<ExecutionReference,
+   * ExecutableFlow collection
+   */
+  private void getActiveFlowHelper(final ArrayList<ExecutableFlow> flows,
+      final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    for (final Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+      flows.add(ref.getSecond());
+    }
   }
 
   @Override
@@ -211,22 +311,77 @@ public class ExecutionController extends EventHandler implements ExecutorManager
   @Override
   public LogData getExecutableFlowLog(final ExecutableFlow exFlow, final int offset,
       final int length) throws ExecutorManagerException {
-    // Todo: get the flow log from executor if the flow is running, else get it from DB.
-    return new LogData(0, 0, "dummy");
+    final Pair<ExecutionReference, ExecutableFlow> pair = this.executorLoader
+        .fetchActiveFlowByExecId(exFlow.getExecutionId());
+    if (pair != null) {
+      final Pair<String, String> typeParam = new Pair<>("type", "flow");
+      final Pair<String, String> offsetParam =
+          new Pair<>("offset", String.valueOf(offset));
+      final Pair<String, String> lengthParam =
+          new Pair<>("length", String.valueOf(length));
+
+      @SuppressWarnings("unchecked") final Map<String, Object> result =
+          this.apiGateway.callWithReference(pair.getFirst(), ConnectorParams.LOG_ACTION,
+              typeParam, offsetParam, lengthParam);
+      return LogData.createLogDataFromObject(result);
+    } else {
+      final LogData value =
+          this.executorLoader.fetchLogs(exFlow.getExecutionId(), "", 0, offset,
+              length);
+      return value;
+    }
   }
 
   @Override
   public LogData getExecutionJobLog(final ExecutableFlow exFlow, final String jobId,
       final int offset, final int length, final int attempt) throws ExecutorManagerException {
-    // Todo: get the job log from executor if the flow is running, else get it from DB.
-    return new LogData(0, 0, "dummy");
+    final Pair<ExecutionReference, ExecutableFlow> pair = this.executorLoader
+        .fetchActiveFlowByExecId(exFlow.getExecutionId());
+    if (pair != null) {
+      final Pair<String, String> typeParam = new Pair<>("type", "job");
+      final Pair<String, String> jobIdParam =
+          new Pair<>("jobId", jobId);
+      final Pair<String, String> offsetParam =
+          new Pair<>("offset", String.valueOf(offset));
+      final Pair<String, String> lengthParam =
+          new Pair<>("length", String.valueOf(length));
+      final Pair<String, String> attemptParam =
+          new Pair<>("attempt", String.valueOf(attempt));
+
+      @SuppressWarnings("unchecked") final Map<String, Object> result =
+          this.apiGateway.callWithReference(pair.getFirst(), ConnectorParams.LOG_ACTION,
+              typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+      return LogData.createLogDataFromObject(result);
+    } else {
+      final LogData value =
+          this.executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt,
+              offset, length);
+      return value;
+    }
   }
 
   @Override
   public List<Object> getExecutionJobStats(final ExecutableFlow exFlow, final String jobId,
       final int attempt) throws ExecutorManagerException {
-    // Todo: get execution job status from executor if the flow is running, else get if from DB.
-    return Collections.emptyList();
+    final Pair<ExecutionReference, ExecutableFlow> pair =
+        this.executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+    if (pair == null) {
+      return this.executorLoader.fetchAttachments(exFlow.getExecutionId(), jobId,
+          attempt);
+    }
+
+    final Pair<String, String> jobIdParam = new Pair<>("jobId", jobId);
+    final Pair<String, String> attemptParam =
+        new Pair<>("attempt", String.valueOf(attempt));
+
+    @SuppressWarnings("unchecked") final Map<String, Object> result =
+        this.apiGateway.callWithReference(pair.getFirst(), ConnectorParams.ATTACHMENTS_ACTION,
+            jobIdParam, attemptParam);
+
+    @SuppressWarnings("unchecked") final List<Object> jobStats = (List<Object>) result
+        .get("attachments");
+
+    return jobStats;
   }
 
   @Override
@@ -249,19 +404,83 @@ public class ExecutionController extends EventHandler implements ExecutorManager
   @Override
   public void resumeFlow(final ExecutableFlow exFlow, final String userId)
       throws ExecutorManagerException {
-    // Todo: call executor to resume the flow
+    synchronized (exFlow) {
+      final Pair<ExecutionReference, ExecutableFlow> pair =
+          this.executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+      if (pair == null) {
+        throw new ExecutorManagerException("Execution "
+            + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+            + " isn't running.");
+      }
+      this.apiGateway
+          .callWithReferenceByUser(pair.getFirst(), ConnectorParams.RESUME_ACTION, userId);
+    }
   }
 
   @Override
   public void pauseFlow(final ExecutableFlow exFlow, final String userId)
       throws ExecutorManagerException {
-    // Todo: call executor to pause the flow
+    synchronized (exFlow) {
+      final Pair<ExecutionReference, ExecutableFlow> pair =
+          this.executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+      if (pair == null) {
+        throw new ExecutorManagerException("Execution "
+            + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+            + " isn't running.");
+      }
+      this.apiGateway
+          .callWithReferenceByUser(pair.getFirst(), ConnectorParams.PAUSE_ACTION, userId);
+    }
   }
 
   @Override
   public void retryFailures(final ExecutableFlow exFlow, final String userId)
       throws ExecutorManagerException {
-    // Todo: call executor to retry failed flows
+    modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_FAILURES, userId);
+  }
+
+  @SuppressWarnings("unchecked")
+  private Map<String, Object> modifyExecutingJobs(final ExecutableFlow exFlow,
+      final String command, final String userId, final String... jobIds)
+      throws ExecutorManagerException {
+    synchronized (exFlow) {
+      final Pair<ExecutionReference, ExecutableFlow> pair =
+          this.executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+      if (pair == null) {
+        throw new ExecutorManagerException("Execution "
+            + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+            + " isn't running.");
+      }
+
+      final Map<String, Object> response;
+      if (jobIds != null && jobIds.length > 0) {
+        for (final String jobId : jobIds) {
+          if (!jobId.isEmpty()) {
+            final ExecutableNode node = exFlow.getExecutableNode(jobId);
+            if (node == null) {
+              throw new ExecutorManagerException("Job " + jobId
+                  + " doesn't exist in execution " + exFlow.getExecutionId()
+                  + ".");
+            }
+          }
+        }
+        final String ids = StringUtils.join(jobIds, ',');
+        response =
+            this.apiGateway.callWithReferenceByUser(pair.getFirst(),
+                ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
+                new Pair<>(
+                    ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command),
+                new Pair<>(ConnectorParams.MODIFY_JOBS_LIST, ids));
+      } else {
+        response =
+            this.apiGateway.callWithReferenceByUser(pair.getFirst(),
+                ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
+                new Pair<>(
+                    ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command));
+      }
+
+      return response;
+    }
   }
 
   /**
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 87ff41a..8827344 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -39,6 +39,12 @@ public interface ExecutorLoader {
   Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
       throws ExecutorManagerException;
 
+  Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchUnfinishedFlows()
+      throws ExecutorManagerException;
+
+  Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(int execId)
+      throws ExecutorManagerException;
+
   List<ExecutableFlow> fetchFlowHistory(int skip, int num)
       throws ExecutorManagerException;
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
index 11ff709..628c369 100644
--- a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
@@ -44,22 +44,111 @@ public class FetchActiveFlowDao {
     this.dbOperator = dbOperator;
   }
 
+  private static Pair<ExecutionReference, ExecutableFlow> getExecutableFlowHelper(
+      final ResultSet rs) throws SQLException {
+    final int id = rs.getInt(1);
+    final int encodingType = rs.getInt(2);
+    final byte[] data = rs.getBytes(3);
+    final String host = rs.getString(4);
+    final int port = rs.getInt(5);
+    final int executorId = rs.getInt(6);
+    final boolean executorStatus = rs.getBoolean(7);
+
+    if (data == null) {
+      logger.warn("Execution id " + id + " has flow_data = null. To clean up, update status to "
+          + "FAILED manually, eg. "
+          + "SET status = " + Status.FAILED.getNumVal() + " WHERE id = " + id);
+    } else {
+      final EncodingType encType = EncodingType.fromInteger(encodingType);
+      try {
+        final ExecutableFlow exFlow =
+            ExecutableFlow.createExecutableFlowFromObject(
+                GZIPUtils.transformBytesToObject(data, encType));
+        final Executor executor;
+        if (host == null) {
+          logger.warn("Executor id " + executorId + " (on execution " +
+              id + ") wasn't found");
+          executor = null;
+        } else {
+          executor = new Executor(executorId, host, port, executorStatus);
+        }
+        final ExecutionReference ref = new ExecutionReference(id, executor);
+        return new Pair<>(ref, exFlow);
+      } catch (final IOException e) {
+        throw new SQLException("Error retrieving flow data " + id, e);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Fetch flows that are not in finished status, including both dispatched and non-dispatched
+   * flows.
+   *
+   * @return unfinished flows map
+   * @throws ExecutorManagerException the executor manager exception
+   */
+  Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchUnfinishedFlows()
+      throws ExecutorManagerException {
+    try {
+      return this.dbOperator.query(FetchActiveExecutableFlows.FETCH_UNFINISHED_EXECUTABLE_FLOWS,
+          new FetchActiveExecutableFlows());
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error fetching unfinished flows", e);
+    }
+  }
+
+  /**
+   * Fetch flows that are dispatched and not yet finished.
+   *
+   * @return active flows map
+   * @throws ExecutorManagerException the executor manager exception
+   */
   Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
       throws ExecutorManagerException {
     try {
-      return this.dbOperator.query(FetchActiveExecutableFlows.FETCH_ACTIVE_EXECUTABLE_FLOW,
+      return this.dbOperator.query(FetchActiveExecutableFlows.FETCH_ACTIVE_EXECUTABLE_FLOWS,
           new FetchActiveExecutableFlows());
     } catch (final SQLException e) {
       throw new ExecutorManagerException("Error fetching active flows", e);
     }
   }
 
+  /**
+   * Fetch the flow that is dispatched and not yet finished by execution id.
+   *
+   * @return active flow pair
+   * @throws ExecutorManagerException the executor manager exception
+   */
+  Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(final int execId)
+      throws ExecutorManagerException {
+    try {
+      return this.dbOperator.query(FetchActiveExecutableFlow
+              .FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXEC_ID,
+          new FetchActiveExecutableFlow(), execId);
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error fetching active flow by exec id" + execId, e);
+    }
+  }
+
   @VisibleForTesting
   static class FetchActiveExecutableFlows implements
       ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
 
-    // Select running and executor assigned flows
-    private static final String FETCH_ACTIVE_EXECUTABLE_FLOW =
+    // Select flows that are not in finished status
+    private static final String FETCH_UNFINISHED_EXECUTABLE_FLOWS =
+        "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
+            + "et.port port, ex.executor_id executorId, et.active executorStatus"
+            + " FROM execution_flows ex"
+            + " LEFT JOIN "
+            + " executors et ON ex.executor_id = et.id"
+            + " Where ex.status NOT IN ("
+            + Status.SUCCEEDED.getNumVal() + ", "
+            + Status.KILLED.getNumVal() + ", "
+            + Status.FAILED.getNumVal() + ")";
+
+    // Select flows that are dispatched and not in finished status
+    private static final String FETCH_ACTIVE_EXECUTABLE_FLOWS =
         "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
             + "et.port port, ex.executor_id executorId, et.active executorStatus"
             + " FROM execution_flows ex"
@@ -86,36 +175,9 @@ public class FetchActiveFlowDao {
       final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> execFlows =
           new HashMap<>();
       do {
-        final int id = rs.getInt(1);
-        final int encodingType = rs.getInt(2);
-        final byte[] data = rs.getBytes(3);
-        final String host = rs.getString(4);
-        final int port = rs.getInt(5);
-        final int executorId = rs.getInt(6);
-        final boolean executorStatus = rs.getBoolean(7);
-
-        if (data == null) {
-          logger.warn("Execution id " + id + " has flow_data=null. To clean up, update status to "
-              + "FAILED manually, eg. "
-              + "SET status = " + Status.FAILED.getNumVal() + " WHERE id = " + id);
-        } else {
-          final EncodingType encType = EncodingType.fromInteger(encodingType);
-          try {
-            final ExecutableFlow exFlow =
-                ExecutableFlow.createExecutableFlowFromObject(
-                    GZIPUtils.transformBytesToObject(data, encType));
-            final Executor executor;
-            if (host == null) {
-              logger.warn("Executor id " + executorId + " (on execution " + id + ") wasn't found");
-              executor = null;
-            } else {
-              executor = new Executor(executorId, host, port, executorStatus);
-            }
-            final ExecutionReference ref = new ExecutionReference(id, executor);
-            execFlows.put(id, new Pair<>(ref, exFlow));
-          } catch (final IOException e) {
-            throw new SQLException("Error retrieving flow data " + id, e);
-          }
+        final Pair<ExecutionReference, ExecutableFlow> exFlow = getExecutableFlowHelper(rs);
+        if (exFlow != null) {
+          execFlows.put(rs.getInt(1), exFlow);
         }
       } while (rs.next());
 
@@ -123,4 +185,35 @@ public class FetchActiveFlowDao {
     }
   }
 
+  private static class FetchActiveExecutableFlow implements
+      ResultSetHandler<Pair<ExecutionReference, ExecutableFlow>> {
+
+    // Select the flow that is dispatched and not in finished status by execution id
+    private static final String FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXEC_ID =
+        "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
+            + "et.port port, ex.executor_id executorId, et.active executorStatus"
+            + " FROM execution_flows ex"
+            + " LEFT JOIN "
+            + " executors et ON ex.executor_id = et.id"
+            + " Where ex.exec_id = ? AND ex.status NOT IN ("
+            + Status.SUCCEEDED.getNumVal() + ", "
+            + Status.KILLED.getNumVal() + ", "
+            + Status.FAILED.getNumVal() + ")"
+            // exclude queued flows that haven't been assigned yet -- this is the opposite of
+            // the condition in ExecutionFlowDao#FETCH_QUEUED_EXECUTABLE_FLOW
+            + " AND NOT ("
+            + "   ex.executor_id IS NULL"
+            + "   AND ex.status = " + Status.PREPARING.getNumVal()
+            + " )";
+
+    @Override
+    public Pair<ExecutionReference, ExecutableFlow> handle(
+        final ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return null;
+      }
+      return getExecutableFlowHelper(rs);
+    }
+  }
+
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 683cc65..13217e4 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -97,11 +97,22 @@ public class JdbcExecutorLoader implements ExecutorLoader {
   @Override
   public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
       throws ExecutorManagerException {
-
     return this.fetchActiveFlowDao.fetchActiveFlows();
   }
 
   @Override
+  public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchUnfinishedFlows()
+      throws ExecutorManagerException {
+    return this.fetchActiveFlowDao.fetchUnfinishedFlows();
+  }
+
+  @Override
+  public Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(final int execId)
+      throws ExecutorManagerException {
+    return this.fetchActiveFlowDao.fetchActiveFlowByExecId(execId);
+  }
+
+  @Override
   public int fetchNumExecutableFlows() throws ExecutorManagerException {
     return this.numExecutionsDao.fetchNumExecutableFlows();
   }
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
new file mode 100644
index 0000000..a41e2db
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
@@ -0,0 +1,126 @@
+/*
+* Copyright 2018 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+package azkaban.executor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import azkaban.utils.Pair;
+import azkaban.utils.TestUtils;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ExecutionControllerTest {
+
+  private Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = new HashMap<>();
+  private Map<Integer, Pair<ExecutionReference, ExecutableFlow>> unfinishedFlows = new
+      HashMap<>();
+  private List<Executor> activeExecutors = new ArrayList<>();
+  private List<Executor> allExecutors = new ArrayList<>();
+  private ExecutionController controller;
+  private ExecutorLoader loader;
+  private ExecutorApiGateway apiGateway;
+
+  @Before
+  public void setup() throws Exception {
+    this.loader = mock(ExecutorLoader.class);
+    this.apiGateway = mock(ExecutorApiGateway.class);
+    this.controller = new ExecutionController(this.loader, this.apiGateway);
+
+    final Executor executor1 = new Executor(1, "localhost", 12345, true);
+    final Executor executor2 = new Executor(2, "localhost", 12346, true);
+    final Executor executor3 = new Executor(3, "localhost", 12347, false);
+    this.activeExecutors = ImmutableList.of(executor1, executor2);
+    this.allExecutors = ImmutableList.of(executor1, executor2, executor3);
+    when(this.loader.fetchActiveExecutors()).thenReturn(this.activeExecutors);
+
+    final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
+    final ExecutableFlow flow2 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
+    final ExecutableFlow flow3 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
+    flow1.setExecutionId(1);
+    flow2.setExecutionId(2);
+    flow3.setExecutionId(3);
+    final ExecutionReference ref1 =
+        new ExecutionReference(flow1.getExecutionId(), null);
+    final ExecutionReference ref2 =
+        new ExecutionReference(flow2.getExecutionId(), executor2);
+    final ExecutionReference ref3 =
+        new ExecutionReference(flow3.getExecutionId(), executor3);
+
+    this.activeFlows = ImmutableMap
+        .of(flow2.getExecutionId(), new Pair<>(ref2, flow2), flow3.getExecutionId(),
+            new Pair<>(ref3, flow3));
+    when(this.loader.fetchActiveFlows()).thenReturn(this.activeFlows);
+
+    this.unfinishedFlows = ImmutableMap.of(flow1.getExecutionId(), new Pair<>(ref1, flow1),
+        flow2.getExecutionId(), new Pair<>(ref2, flow2), flow3.getExecutionId(), new Pair<>(ref3,
+            flow3));
+    when(this.loader.fetchUnfinishedFlows()).thenReturn(this.unfinishedFlows);
+  }
+
+  @After
+  public void tearDown() {
+    if (this.controller != null) {
+      this.controller.shutdown();
+    }
+  }
+
+  @Test
+  public void testFetchAllActiveFlows() throws Exception {
+    final List<ExecutableFlow> flows = this.controller.getRunningFlows();
+    this.unfinishedFlows.values()
+        .forEach(pair -> assertThat(flows.contains(pair.getSecond())).isTrue());
+  }
+
+  @Test
+  public void testFetchActiveFlowByProject() throws Exception {
+    final ExecutableFlow flow2 = this.unfinishedFlows.get(2).getSecond();
+    final ExecutableFlow flow3 = this.unfinishedFlows.get(3).getSecond();
+    final List<Integer> executions = this.controller.getRunningFlows(flow2.getProjectId(), flow2
+        .getFlowId());
+    assertThat(executions.contains(flow2.getExecutionId())).isTrue();
+    assertThat(executions.contains(flow3.getExecutionId())).isTrue();
+    assertThat(this.controller.isFlowRunning(flow2.getProjectId(), flow2.getFlowId())).isTrue();
+    assertThat(this.controller.isFlowRunning(flow3.getProjectId(), flow3.getFlowId())).isTrue();
+  }
+
+  @Test
+  public void testFetchActiveFlowWithExecutor() throws Exception {
+    final List<Pair<ExecutableFlow, Optional<Executor>>> activeFlowsWithExecutor =
+        this.controller.getActiveFlowsWithExecutor();
+    this.unfinishedFlows.values().forEach(pair -> assertThat(activeFlowsWithExecutor
+        .contains(new Pair<>(pair.getSecond(), pair.getFirst().getExecutor()))).isTrue());
+  }
+
+  @Test
+  public void testFetchAllActiveExecutorServerHosts() throws Exception {
+    final Set<String> activeExecutorServerHosts = this.controller.getAllActiveExecutorServerHosts();
+    assertThat(activeExecutorServerHosts.size()).isEqualTo(3);
+    this.allExecutors.forEach(executor -> assertThat(
+        activeExecutorServerHosts.contains(executor.getHost() + ":" + executor.getPort()))
+        .isTrue());
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index 412907a..35ad37f 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -29,6 +29,7 @@ import azkaban.user.User;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.utils.TestUtils;
+import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.IOException;
 import java.sql.SQLException;
@@ -311,9 +312,50 @@ public class ExecutionFlowDaoTest {
 
   @Test
   public void testFetchActiveFlowsExecutorAssigned() throws Exception {
+    final List<ExecutableFlow> flows = createExecutions();
+    final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = this.fetchActiveFlowDao
+        .fetchActiveFlows();
+    assertFound(activeFlows, flows.get(0), true);
+    assertNotFound(activeFlows, flows.get(1), "Returned a queued execution");
+    assertFound(activeFlows, flows.get(2), true);
+    assertNotFound(activeFlows, flows.get(3), "Returned an execution with a finished status");
+    assertFound(activeFlows, flows.get(4), false);
+    assertTwoFlowSame(activeFlows.get(flows.get(0).getExecutionId()).getSecond(), flows.get(0));
+  }
 
-    final Executor executor = this.executorDao.addExecutor("test", 1);
+  @Test
+  public void testFetchUnfinishedFlows() throws Exception {
+    final List<ExecutableFlow> flows = createExecutions();
+    final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> unfinishedFlows =
+        this.fetchActiveFlowDao.fetchUnfinishedFlows();
+    assertFound(unfinishedFlows, flows.get(0), true);
+    assertFound(unfinishedFlows, flows.get(1), false);
+    assertFound(unfinishedFlows, flows.get(2), true);
+    assertNotFound(unfinishedFlows, flows.get(3), "Returned an execution with a finished status");
+    assertFound(unfinishedFlows, flows.get(4), false);
+    assertTwoFlowSame(unfinishedFlows.get(flows.get(0).getExecutionId()).getSecond(), flows.get(0));
+  }
+
+  @Test
+  public void testFetchActiveFlowByExecId() throws Exception {
+    final List<ExecutableFlow> flows = createExecutions();
+    assertTwoFlowSame(
+        this.fetchActiveFlowDao.fetchActiveFlowByExecId(flows.get(0).getExecutionId()).getSecond(),
+        flows.get(0));
+    assertThat(this.fetchActiveFlowDao.fetchActiveFlowByExecId(flows.get(1).getExecutionId()))
+        .isNull();
+    assertTwoFlowSame(
+        this.fetchActiveFlowDao.fetchActiveFlowByExecId(flows.get(2).getExecutionId()).getSecond(),
+        flows.get(2));
+    assertThat(this.fetchActiveFlowDao.fetchActiveFlowByExecId(flows.get(3).getExecutionId()))
+        .isNull();
+    assertTwoFlowSame(
+        this.fetchActiveFlowDao.fetchActiveFlowByExecId(flows.get(4).getExecutionId()).getSecond(),
+        flows.get(4));
+  }
 
+  private List<ExecutableFlow> createExecutions() throws Exception {
+    final Executor executor = this.executorDao.addExecutor("test", 1);
     final ExecutableFlow flow1 = createExecutionAndAssign(Status.PREPARING, executor);
     // flow2 is not assigned
     final ExecutableFlow flow2 = createExecution(Status.PREPARING);
@@ -324,19 +366,7 @@ public class ExecutionFlowDaoTest {
     // flow5 is assigned to an executor that is then removed
     final ExecutableFlow flow5 = createExecutionAndAssign(Status.RUNNING, executor2);
     this.executorDao.removeExecutor(executor2.getHost(), executor2.getPort());
-
-    final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows =
-        this.fetchActiveFlowDao.fetchActiveFlows();
-
-    assertFound(activeFlows, flow1, true);
-    assertNotFound(activeFlows, flow2, "Returned a queued execution");
-    assertFound(activeFlows, flow3, true);
-    assertNotFound(activeFlows, flow4, "Returned an execution with a finished status");
-    assertFound(activeFlows, flow5, false);
-
-    final ExecutableFlow flow1Result =
-        activeFlows.get(flow1.getExecutionId()).getSecond();
-    assertTwoFlowSame(flow1Result, flow1);
+    return ImmutableList.of(flow1, flow2, flow3, flow4, flow5);
   }
 
   private void assertNotFound(
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 3973d0b..8033629 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -76,6 +76,17 @@ public class MockExecutorLoader implements ExecutorLoader {
   }
 
   @Override
+  public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchUnfinishedFlows()
+      throws ExecutorManagerException {
+    return new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(final int execId) {
+    return new Pair<>(null, null);
+  }
+
+  @Override
   public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
       final int skip, final int num) throws ExecutorManagerException {
     return null;
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/StatsServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/StatsServlet.java
index d42da55..0be6cad 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/StatsServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/StatsServlet.java
@@ -184,6 +184,10 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
       final Collection<Executor> executors = this.execManagerAdapter.getAllActiveExecutors();
       page.add("executorList", executors);
 
+      if (executors.isEmpty()) {
+        throw new ExecutorManagerException("Executor list is empty.");
+      }
+
       final Map<String, Object> result =
           this.execManagerAdapter.callExecutorStats(executors.iterator().next().getId(),
               ConnectorParams.STATS_GET_ALLMETRICSNAME,