azkaban-aplcache

Remove runningFlows cache in ExecutorManager. Added unit

5/3/2017 2:38:42 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index d041209..2bf7baf 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -35,6 +35,9 @@ public interface ExecutorLoader {
   Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
       throws ExecutorManagerException;
 
+  Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(int execId)
+      throws ExecutorManagerException;
+
   List<ExecutableFlow> fetchFlowHistory(int skip, int num)
       throws ExecutorManagerException;
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 9ab3504..65834a7 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -93,14 +93,12 @@ public class ExecutorManager extends EventHandler implements
 
   private CleanerThread cleanerThread;
 
-  private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
-      new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
   private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
-      new ConcurrentHashMap<Integer, ExecutableFlow>();
+      new ConcurrentHashMap<>();
 
   QueuedExecutions queuedFlows;
 
-  final private Set<Executor> activeExecutors = new HashSet<Executor>();
+  final private Set<Executor> activeExecutors = new HashSet<>();
   private QueueProcessorThread queueProcessor;
   private volatile Pair<ExecutionReference, ExecutableFlow> runningCandidate = null;
 
@@ -129,7 +127,6 @@ public class ExecutorManager extends EventHandler implements
     this.azkProps = azkProps;
     this.executorLoader = loader;
     this.setupExecutors();
-    this.loadRunningFlows();
 
     queuedFlows =
         new QueuedExecutions(azkProps.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
@@ -420,23 +417,23 @@ public class ExecutorManager extends EventHandler implements
   @Override
   public Set<String> getAllActiveExecutorServerHosts() {
     // Includes non primary server/hosts
-    HashSet<String> ports = new HashSet<String>();
+    HashSet<String> ports = new HashSet<>();
     for (Executor executor : activeExecutors) {
       ports.add(executor.getHost() + ":" + executor.getPort());
     }
-    // include executor which were initially active and still has flows running
-    for (Pair<ExecutionReference, ExecutableFlow> running : runningFlows
-      .values()) {
-      ExecutionReference ref = running.getFirst();
-      ports.add(ref.getHost() + ":" + ref.getPort());
+    try {
+      // include executor which were initially active and still has flows running
+      for (Pair<ExecutionReference, ExecutableFlow> running :
+          executorLoader.fetchActiveFlows().values()) {
+        ExecutionReference ref = running.getFirst();
+        ports.add(ref.getHost() + ":" + ref.getPort());
+      }
+    } catch(ExecutorManagerException e) {
+      logger.error(e);
     }
     return ports;
   }
 
-  private void loadRunningFlows() throws ExecutorManagerException {
-    runningFlows.putAll(executorLoader.fetchActiveFlows());
-  }
-
   /*
    * load queued flows i.e with active_execution_reference and not assigned to
    * any executor
@@ -469,8 +466,12 @@ public class ExecutorManager extends EventHandler implements
     if (runningCandidate != null) {
       executionIds.addAll(getRunningFlowsHelper(projectId, flowId, Lists.newArrayList(runningCandidate)));
     }
-    executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
-      runningFlows.values()));
+    try {
+      executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
+          executorLoader.fetchActiveFlows().values()));
+    } catch(ExecutorManagerException e) {
+      logger.error(e);
+    }
     Collections.sort(executionIds);
     return executionIds;
   }
@@ -497,10 +498,13 @@ public class ExecutorManager extends EventHandler implements
   @Override
   public List<Pair<ExecutableFlow, Executor>> getActiveFlowsWithExecutor()
     throws IOException {
-    List<Pair<ExecutableFlow, Executor>> flows =
-      new ArrayList<Pair<ExecutableFlow, Executor>>();
+    List<Pair<ExecutableFlow, Executor>> flows = new ArrayList<>();
     getActiveFlowsWithExecutorHelper(flows, queuedFlows.getAllEntries());
-    getActiveFlowsWithExecutorHelper(flows, runningFlows.values());
+    try {
+      getActiveFlowsWithExecutorHelper(flows, executorLoader.fetchActiveFlows().values());
+    } catch(ExecutorManagerException e) {
+      logger.error(e);
+    }
     return flows;
   }
 
@@ -527,9 +531,12 @@ public class ExecutorManager extends EventHandler implements
     isRunning =
       isRunning
         || isFlowRunningHelper(projectId, flowId, queuedFlows.getAllEntries());
-    isRunning =
-      isRunning
-        || isFlowRunningHelper(projectId, flowId, runningFlows.values());
+    try {
+      isRunning = isRunning || isFlowRunningHelper(projectId, flowId,
+          executorLoader.fetchActiveFlows().values());
+    } catch(ExecutorManagerException e) {
+      logger.error(e);
+    }
     return isRunning;
   }
 
@@ -565,9 +572,13 @@ public class ExecutorManager extends EventHandler implements
    */
   @Override
   public List<ExecutableFlow> getRunningFlows() {
-    ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
+    ArrayList<ExecutableFlow> flows = new ArrayList<>();
     getActiveFlowHelper(flows, queuedFlows.getAllEntries());
-    getActiveFlowHelper(flows, runningFlows.values());
+    try {
+      getActiveFlowHelper(flows, executorLoader.fetchActiveFlows().values());
+    } catch(ExecutorManagerException e) {
+      logger.error(e);
+    }
     return flows;
   }
 
@@ -590,9 +601,13 @@ public class ExecutorManager extends EventHandler implements
    * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
    */
   public String getRunningFlowIds() {
-    List<Integer> allIds = new ArrayList<Integer>();
+    List<Integer> allIds = new ArrayList<>();
     getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
-    getRunningFlowsIdsHelper(allIds, runningFlows.values());
+    try {
+      getRunningFlowsIdsHelper(allIds, executorLoader.fetchActiveFlows().values());
+    } catch(ExecutorManagerException e) {
+      logger.error(e);
+    }
     Collections.sort(allIds);
     return allIds.toString();
   }
@@ -687,7 +702,7 @@ public class ExecutorManager extends EventHandler implements
   public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset,
       int length) throws ExecutorManagerException {
     Pair<ExecutionReference, ExecutableFlow> pair =
-        runningFlows.get(exFlow.getExecutionId());
+        executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
     if (pair != null) {
       Pair<String, String> typeParam = new Pair<String, String>("type", "flow");
       Pair<String, String> offsetParam =
@@ -712,7 +727,7 @@ public class ExecutorManager extends EventHandler implements
   public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId,
       int offset, int length, int attempt) throws ExecutorManagerException {
     Pair<ExecutionReference, ExecutableFlow> pair =
-        runningFlows.get(exFlow.getExecutionId());
+        executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
     if (pair != null) {
       Pair<String, String> typeParam = new Pair<String, String>("type", "job");
       Pair<String, String> jobIdParam =
@@ -741,7 +756,7 @@ public class ExecutorManager extends EventHandler implements
   public List<Object> getExecutionJobStats(ExecutableFlow exFlow, String jobId,
       int attempt) throws ExecutorManagerException {
     Pair<ExecutionReference, ExecutableFlow> pair =
-        runningFlows.get(exFlow.getExecutionId());
+        executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
     if (pair == null) {
       return executorLoader.fetchAttachments(exFlow.getExecutionId(), jobId,
           attempt);
@@ -766,9 +781,9 @@ public class ExecutorManager extends EventHandler implements
   public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow,
       String jobId, int offset, int length, int attempt)
       throws ExecutorManagerException {
-    Pair<ExecutionReference, ExecutableFlow> pair =
-        runningFlows.get(exFlow.getExecutionId());
-    if (pair != null) {
+    Pair<ExecutionReference, ExecutableFlow> activeFlow =
+        executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+    if (activeFlow != null) {
 
       Pair<String, String> typeParam = new Pair<String, String>("type", "job");
       Pair<String, String> jobIdParam =
@@ -782,7 +797,7 @@ public class ExecutorManager extends EventHandler implements
 
       @SuppressWarnings("unchecked")
       Map<String, Object> result =
-          callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION,
+          callExecutorServer(activeFlow.getFirst(), ConnectorParams.METADATA_ACTION,
               typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
       return JobMetaData.createJobMetaDataFromObject(result);
     } else {
@@ -801,10 +816,10 @@ public class ExecutorManager extends EventHandler implements
   public void cancelFlow(ExecutableFlow exFlow, String userId)
     throws ExecutorManagerException {
     synchronized (exFlow) {
-      if (runningFlows.containsKey(exFlow.getExecutionId())) {
-        Pair<ExecutionReference, ExecutableFlow> pair =
-          runningFlows.get(exFlow.getExecutionId());
-        callExecutorServer(pair.getFirst(), ConnectorParams.CANCEL_ACTION,
+      Pair<ExecutionReference, ExecutableFlow> activeFlow =
+          executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+      if(activeFlow != null) {
+        callExecutorServer(activeFlow.getFirst(), ConnectorParams.CANCEL_ACTION,
           userId);
       } else if (queuedFlows.hasExecution(exFlow.getExecutionId())) {
         queuedFlows.dequeue(exFlow.getExecutionId());
@@ -821,14 +836,14 @@ public class ExecutorManager extends EventHandler implements
   public void resumeFlow(ExecutableFlow exFlow, String userId)
       throws ExecutorManagerException {
     synchronized (exFlow) {
-      Pair<ExecutionReference, ExecutableFlow> pair =
-          runningFlows.get(exFlow.getExecutionId());
-      if (pair == null) {
+      Pair<ExecutionReference, ExecutableFlow> activeFlow =
+          executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+      if (activeFlow == null) {
         throw new ExecutorManagerException("Execution "
             + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
             + " isn't running.");
       }
-      callExecutorServer(pair.getFirst(), ConnectorParams.RESUME_ACTION, userId);
+      callExecutorServer(activeFlow.getFirst(), ConnectorParams.RESUME_ACTION, userId);
     }
   }
 
@@ -836,14 +851,14 @@ public class ExecutorManager extends EventHandler implements
   public void pauseFlow(ExecutableFlow exFlow, String userId)
       throws ExecutorManagerException {
     synchronized (exFlow) {
-      Pair<ExecutionReference, ExecutableFlow> pair =
-          runningFlows.get(exFlow.getExecutionId());
-      if (pair == null) {
+      Pair<ExecutionReference, ExecutableFlow> activeFlow =
+          executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
+      if (activeFlow == null) {
         throw new ExecutorManagerException("Execution "
             + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
             + " isn't running.");
       }
-      callExecutorServer(pair.getFirst(), ConnectorParams.PAUSE_ACTION, userId);
+      callExecutorServer(activeFlow.getFirst(), ConnectorParams.PAUSE_ACTION, userId);
     }
   }
 
@@ -901,7 +916,7 @@ public class ExecutorManager extends EventHandler implements
       throws ExecutorManagerException {
     synchronized (exFlow) {
       Pair<ExecutionReference, ExecutableFlow> pair =
-          runningFlows.get(exFlow.getExecutionId());
+          executorLoader.fetchActiveFlowByExecId(exFlow.getExecutionId());
       if (pair == null) {
         throw new ExecutorManagerException("Execution "
             + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
@@ -1142,6 +1157,7 @@ public class ExecutorManager extends EventHandler implements
       paramList = new ArrayList<Pair<String, String>>();
     }
 
+    // TODO: refactor using Guice, inject ExecutorApiClient in ExecutorManager
     ExecutorApiClient apiclient = ExecutorApiClient.getInstance();
     @SuppressWarnings("unchecked")
     URI uri =
@@ -1240,6 +1256,8 @@ public class ExecutorManager extends EventHandler implements
               new ArrayList<ExecutableFlow>();
           ArrayList<ExecutableFlow> finalizeFlows =
               new ArrayList<ExecutableFlow>();
+          Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows =
+              executorLoader.fetchActiveFlows();
 
           if (exFlowMap.size() > 0) {
             for (Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
@@ -1275,15 +1293,15 @@ public class ExecutorManager extends EventHandler implements
               } catch (IOException e) {
                 logger.error(e);
                 for (ExecutableFlow flow : entry.getValue()) {
-                  Pair<ExecutionReference, ExecutableFlow> pair =
-                      runningFlows.get(flow.getExecutionId());
+                  Pair<ExecutionReference, ExecutableFlow> activeFlow =
+                      activeFlows.get(flow.getExecutionId());
 
                   updaterStage =
                       "Failed to get update. Doing some clean up for flow "
-                          + pair.getSecond().getExecutionId();
+                          + flow.getExecutionId();
 
-                  if (pair != null) {
-                    ExecutionReference ref = pair.getFirst();
+                  if (activeFlow != null) {
+                    ExecutionReference ref = activeFlow.getFirst();
                     int numErrors = ref.getNumErrors();
                     if (ref.getNumErrors() < this.numErrors) {
                       ref.setNextCheckTime(System.currentTimeMillis()
@@ -1293,7 +1311,7 @@ public class ExecutorManager extends EventHandler implements
                       logger.error("Evicting flow " + flow.getExecutionId()
                           + ". The executor is unresponsive.");
                       // TODO should send out an unresponsive email here.
-                      finalizeFlows.add(pair.getSecond());
+                      finalizeFlows.add(activeFlow.getSecond());
                     }
                   }
                 }
@@ -1354,7 +1372,7 @@ public class ExecutorManager extends EventHandler implements
 
           synchronized (this) {
             try {
-              if (runningFlows.size() > 0) {
+              if (activeFlows.size() > 0) {
                 this.wait(waitTimeMs);
               } else {
                 this.wait(waitTimeIdleMs);
@@ -1402,7 +1420,6 @@ public class ExecutorManager extends EventHandler implements
       executorLoader.removeActiveExecutableReference(execId);
 
       updaterStage = "finalizing flow " + execId + " cleaning from memory";
-      runningFlows.remove(execId);
       fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED, new EventData(dsFlow.getStatus())));
       recentlyFinished.put(execId, dsFlow);
 
@@ -1530,15 +1547,15 @@ public class ExecutorManager extends EventHandler implements
           "Response is malformed. Need exec id to update.");
     }
 
-    Pair<ExecutionReference, ExecutableFlow> refPair =
-        this.runningFlows.get(execId);
-    if (refPair == null) {
+    Pair<ExecutionReference, ExecutableFlow> activeFlow =
+        executorLoader.fetchActiveFlowByExecId(execId);
+    if (activeFlow == null) {
       throw new ExecutorManagerException(
           "No running flow found with the execution id. Removing " + execId);
     }
 
-    ExecutionReference ref = refPair.getFirst();
-    ExecutableFlow flow = refPair.getSecond();
+    ExecutionReference ref = activeFlow.getFirst();
+    ExecutableFlow flow = activeFlow.getSecond();
     if (updateData.containsKey("error")) {
       // The flow should be finished here.
       throw new ExecutorManagerException((String) updateData.get("error"), flow);
@@ -1610,27 +1627,31 @@ public class ExecutorManager extends EventHandler implements
   /* Group Executable flow by Executors to reduce number of REST calls */
   private Map<Executor, List<ExecutableFlow>> getFlowToExecutorMap() {
     HashMap<Executor, List<ExecutableFlow>> exFlowMap =
-      new HashMap<Executor, List<ExecutableFlow>>();
+      new HashMap<>();
 
-    for (Pair<ExecutionReference, ExecutableFlow> runningFlow : runningFlows
-      .values()) {
-      ExecutionReference ref = runningFlow.getFirst();
-      ExecutableFlow flow = runningFlow.getSecond();
-      Executor executor = ref.getExecutor();
+    try {
+      for (Pair<ExecutionReference, ExecutableFlow> runningFlow :
+          executorLoader.fetchActiveFlows().values()) {
+        ExecutionReference ref = runningFlow.getFirst();
+        ExecutableFlow flow = runningFlow.getSecond();
+        Executor executor = ref.getExecutor();
+
+        // We can set the next check time to prevent the checking of certain
+        // flows.
+        if (ref.getNextCheckTime() >= System.currentTimeMillis()) {
+          continue;
+        }
 
-      // We can set the next check time to prevent the checking of certain
-      // flows.
-      if (ref.getNextCheckTime() >= System.currentTimeMillis()) {
-        continue;
-      }
+        List<ExecutableFlow> flows = exFlowMap.get(executor);
+        if (flows == null) {
+          flows = new ArrayList<>();
+          exFlowMap.put(executor, flows);
+        }
 
-      List<ExecutableFlow> flows = exFlowMap.get(executor);
-      if (flows == null) {
-        flows = new ArrayList<ExecutableFlow>();
-        exFlowMap.put(executor, flows);
+        flows.add(flow);
       }
-
-      flows.add(flow);
+    } catch(ExecutorManagerException e) {
+      logger.error(e);
     }
 
     return exFlowMap;
@@ -1732,10 +1753,6 @@ public class ExecutorManager extends EventHandler implements
     }
     reference.setExecutor(choosenExecutor);
 
-    // move from flow to running flows
-    runningFlows.put(exflow.getExecutionId(),
-      new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
-
     logger.info(String.format(
       "Successfully dispatched exec %d with error count %d",
       exflow.getExecutionId(), reference.getNumErrors()));
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index a8a4eaf..0b79bb7 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -219,6 +219,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   }
 
   @Override
+  public Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(int execId)
+      throws ExecutorManagerException {
+    QueryRunner runner = createQueryRunner();
+    FetchActiveExecutableFlowByExecId flowHandler = new FetchActiveExecutableFlowByExecId();
+
+    try {
+      List<Pair<ExecutionReference, ExecutableFlow>> flows =
+          runner.query(FetchActiveExecutableFlowByExecId.FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID,
+              flowHandler, execId);
+      if(flows.isEmpty()) {
+        return null;
+      }
+      else {
+        return flows.get(0);
+      }
+    } catch (SQLException e) {
+      throw new ExecutorManagerException("Error fetching active flows by exec id", e);
+    }
+  }
+
+  @Override
   public int fetchNumExecutableFlows() throws ExecutorManagerException {
     QueryRunner runner = createQueryRunner();
 
@@ -1157,7 +1178,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
         "SELECT exec_id, project_id, version, flow_id, job_id, "
             + "start_time, end_time, status, attempt "
             + "FROM execution_jobs WHERE exec_id=? "
-            + "AND job_id=? AND attempt_id=?";
+            + "AND job_id=? AND attempt=?";
     private static String FETCH_EXECUTABLE_NODE_ATTEMPTS =
         "SELECT exec_id, project_id, version, flow_id, job_id, "
             + "start_time, end_time, status, attempt FROM execution_jobs "
@@ -1364,7 +1385,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       }
 
       Map<Integer, Pair<ExecutionReference, ExecutableFlow>> execFlows =
-          new HashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
+          new HashMap<>();
       do {
         int id = rs.getInt(1);
         int encodingType = rs.getInt(2);
@@ -1410,6 +1431,68 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     }
   }
 
+  private static class FetchActiveExecutableFlowByExecId implements
+      ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
+    private static String FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID =
+        "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
+            + "et.port port, ax.update_time axUpdateTime, et.id executorId, et.active executorStatus"
+            + " FROM execution_flows ex"
+            + " INNER JOIN "
+            + " active_executing_flows ax ON ex.exec_id = ax.exec_id"
+            + " INNER JOIN "
+            + " executors et ON ex.executor_id = et.id"
+            + " WHERE ax.exec_id = ?";
+
+    @Override
+    public List<Pair<ExecutionReference, ExecutableFlow>> handle(ResultSet rs)
+        throws SQLException {
+      if (!rs.next()) {
+        return Collections.emptyList();
+      }
+
+      List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
+          new ArrayList<>();
+      do {
+        int id = rs.getInt(1);
+        int encodingType = rs.getInt(2);
+        byte[] data = rs.getBytes(3);
+        String host = rs.getString(4);
+        int port = rs.getInt(5);
+        long updateTime = rs.getLong(6);
+        int executorId = rs.getInt(7);
+        boolean executorStatus = rs.getBoolean(8);
+
+        if (data == null) {
+          logger.error("Found a flow with empty data blob exec_id: " + id);
+        } else {
+          EncodingType encType = EncodingType.fromInteger(encodingType);
+          Object flowObj;
+          try {
+            if (encType == EncodingType.GZIP) {
+              String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+              flowObj = JSONUtils.parseJSONFromString(jsonString);
+            } else {
+              String jsonString = new String(data, "UTF-8");
+              flowObj = JSONUtils.parseJSONFromString(jsonString);
+            }
+
+            ExecutableFlow exFlow =
+                ExecutableFlow.createExecutableFlowFromObject(flowObj);
+            Executor executor = new Executor(executorId, host, port, executorStatus);
+            ExecutionReference ref = new ExecutionReference(id, executor);
+            ref.setUpdateTime(updateTime);
+
+            execFlows.add(new Pair<>(ref, exFlow));
+          } catch (IOException e) {
+            throw new SQLException("Error retrieving flow data " + id, e);
+          }
+        }
+      } while (rs.next());
+
+      return execFlows;
+    }
+  }
+
   private static class FetchExecutableFlows implements
       ResultSetHandler<List<ExecutableFlow>> {
     private static String FETCH_BASE_EXECUTABLE_FLOW_QUERY =
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 62d187b..8f84eca 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -16,30 +16,36 @@
 
 package azkaban.executor;
 
-import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 import azkaban.alert.Alerter;
-import azkaban.flow.Flow;
-import azkaban.project.Project;
 import azkaban.user.User;
-import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.utils.TestUtils;
+import static org.mockito.Mockito.*;
 
 /**
  * Test class for executor manager
  */
 public class ExecutorManagerTest {
+  private ExecutorManager manager;
+  private ExecutorLoader loader;
+  private Props props;
+  private User user;
+  private Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = new HashMap<>();
+  private ExecutableFlow flow1;
+  private ExecutableFlow flow2;
 
   /* Helper method to create a ExecutorManager Instance */
   private ExecutorManager createMultiExecutorManagerInstance()
@@ -255,4 +261,90 @@ public class ExecutorManagerTest {
 
     Assert.assertFalse(manager.getRunningFlows().contains(flow1));
   }
+
+  /*
+   * Added tests for runningFlows
+   * TODO: When removing queuedFlows cache, will refactor rest of the ExecutorManager test cases
+   */
+  @Test
+  public void testSubmitFlows() throws ExecutorManagerException, IOException {
+    testSetUpForRunningFlows();
+    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    manager.submitExecutableFlow(flow1, user.getUserId());
+    verify(loader).uploadExecutableFlow(flow1);
+    verify(loader).addActiveExecutableReference(any());
+  }
+
+  @Test
+  public void testFetchAllActiveFlows() throws ExecutorManagerException, IOException {
+    testSetUpForRunningFlows();
+    List<ExecutableFlow> flows = manager.getRunningFlows();
+    for(Pair<ExecutionReference, ExecutableFlow> pair : activeFlows.values()) {
+      Assert.assertTrue(flows.contains(pair.getSecond()));
+    }
+  }
+
+  @Test
+  public void testFetchActiveFlowByProject() throws ExecutorManagerException, IOException {
+    testSetUpForRunningFlows();
+    List<Integer> executions = manager.getRunningFlows(flow1.getProjectId(), flow1.getFlowId());
+    Assert.assertTrue(executions.contains(flow1.getExecutionId()));
+    Assert.assertTrue(manager.isFlowRunning(flow1.getProjectId(), flow1.getFlowId()));
+  }
+
+  @Test
+  public void testFetchActiveFlowWithExecutor() throws ExecutorManagerException, IOException {
+    testSetUpForRunningFlows();
+    List<Pair<ExecutableFlow, Executor>> activeFlowsWithExecutor =
+        manager.getActiveFlowsWithExecutor();
+    Assert.assertTrue(activeFlowsWithExecutor.contains(new Pair<>(flow1,
+        manager.fetchExecutor(flow1.getExecutionId()))));
+    Assert.assertTrue(activeFlowsWithExecutor.contains(new Pair<>(flow2,
+        manager.fetchExecutor(flow2.getExecutionId()))));
+  }
+
+  @Test
+  public void testFetchAllActiveExecutorServerHosts() throws ExecutorManagerException, IOException {
+    testSetUpForRunningFlows();
+    Set<String> activeExecutorServerHosts = manager.getAllActiveExecutorServerHosts();
+    Executor executor1 = manager.fetchExecutor(flow1.getExecutionId());
+    Executor executor2 = manager.fetchExecutor(flow2.getExecutionId());
+    Assert.assertTrue(activeExecutorServerHosts.contains(executor1.getHost() + ":" + executor1.getPort()));
+    Assert.assertTrue(activeExecutorServerHosts.contains(executor2.getHost() + ":" + executor2.getPort()));
+  }
+
+  /*
+   * TODO: will move below method to setUp() and run before every test for both runningFlows and queuedFlows
+   */
+  private void testSetUpForRunningFlows()
+      throws ExecutorManagerException, IOException {
+    loader = mock(ExecutorLoader.class);
+    user = TestUtils.getTestUser();
+    props = new Props();
+    props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    //To test runningFlows, AZKABAN_QUEUEPROCESSING_ENABLED should be set to true
+    //so that flows will be dispatched to executors.
+    props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "true");
+
+    List<Executor> executors = new ArrayList<>();
+    Executor executor1 = new Executor(1, "localhost", 12345, true);
+    Executor executor2 = new Executor(2, "localhost", 12346, true);
+    executors.add(executor1);
+    executors.add(executor2);
+
+    when(loader.fetchActiveExecutors()).thenReturn(executors);
+    manager = new ExecutorManager(props, loader, new HashMap<>());
+
+    flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+    flow1.setExecutionId(1);
+    flow2.setExecutionId(2);
+    ExecutionReference ref1 =
+        new ExecutionReference(flow1.getExecutionId(), executor1);
+    ExecutionReference ref2 =
+        new ExecutionReference(flow2.getExecutionId(), executor2);
+    activeFlows.put(flow1.getExecutionId(), new Pair<>(ref1, flow1));
+    activeFlows.put(flow2.getExecutionId(), new Pair<>(ref2, flow2));
+    when(loader.fetchActiveFlows()).thenReturn(activeFlows);
+  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index 39ed6d9..3a41b56 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -22,7 +22,6 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -42,11 +41,8 @@ import org.junit.Test;
 
 import azkaban.database.DataSourceUtils;
 import azkaban.executor.ExecutorLogEvent.EventType;
-import azkaban.flow.Flow;
-import azkaban.project.Project;
 import azkaban.user.User;
 import azkaban.utils.FileIOUtils.LogData;
-import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.utils.TestUtils;
@@ -55,7 +51,7 @@ public class JdbcExecutorLoaderTest {
   private static boolean testDBExists;
   /* Directory with serialized description of test flows */
   private static final String UNIT_BASE_DIR =
-    "../azkaban-test/src/test/resources/executions";
+    "../azkaban-test/src/test/resources/azkaban/test/executions";
   // @TODO remove this and turn into local host.
   private static final String host = "localhost";
   private static final int port = 3306;
@@ -501,19 +497,26 @@ public class JdbcExecutorLoaderTest {
     ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow);
     ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
-    loader.uploadExecutableFlow(flow);
+    loader.uploadExecutableFlow(flow2);
 
     ExecutionReference ref2 = new ExecutionReference(flow2.getExecutionId());
     loader.addActiveExecutableReference(ref2);
     ExecutionReference ref = new ExecutionReference(flow.getExecutionId());
     loader.addActiveExecutableReference(ref);
 
-    queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref, flow));
-    queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref2, flow2));
+    List<Pair<ExecutionReference, ExecutableFlow>> fetchedQueuedFlows = loader.fetchQueuedFlows();
+    Assert.assertEquals(2, fetchedQueuedFlows.size());
+    Pair<ExecutionReference, ExecutableFlow> fetchedFlow1 = fetchedQueuedFlows.get(0);
+    Pair<ExecutionReference, ExecutableFlow> fetchedFlow2 = fetchedQueuedFlows.get(1);
 
-    // only running and completed flows
-    Assert.assertArrayEquals(loader.fetchQueuedFlows().toArray(),
-      queuedFlows.toArray());
+    Assert.assertEquals(ref.getExecId(), fetchedFlow1.getFirst().getExecId());
+    Assert.assertEquals(flow.getExecutionId(), fetchedFlow1.getSecond().getExecutionId());
+    Assert.assertEquals(flow.getFlowId(), fetchedFlow1.getSecond().getFlowId());
+    Assert.assertEquals(flow.getProjectId(), fetchedFlow1.getSecond().getProjectId());
+    Assert.assertEquals(ref2.getExecId(), fetchedFlow2.getFirst().getExecId());
+    Assert.assertEquals(flow2.getExecutionId(), fetchedFlow2.getSecond().getExecutionId());
+    Assert.assertEquals(flow2.getFlowId(), fetchedFlow2.getSecond().getFlowId());
+    Assert.assertEquals(flow2.getProjectId(), fetchedFlow2.getSecond().getProjectId());
   }
 
   /* Test all executors fetch from empty executors */
@@ -786,13 +789,15 @@ public class JdbcExecutorLoaderTest {
     ExecutorLoader loader = createLoader();
     ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow1);
-    Executor executor = new Executor(2, "test", 1, true);
+    Executor executor = loader.addExecutor("test", 1);
+    loader.assignExecutor(executor.getId(), flow1.getExecutionId());
     ExecutionReference ref1 =
         new ExecutionReference(flow1.getExecutionId(), executor);
     loader.addActiveExecutableReference(ref1);
 
     ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow2);
+    loader.assignExecutor(executor.getId(), flow2.getExecutionId());
     ExecutionReference ref2 =
         new ExecutionReference(flow2.getExecutionId(), executor);
     loader.addActiveExecutableReference(ref2);
@@ -839,6 +844,35 @@ public class JdbcExecutorLoaderTest {
     Assert.assertFalse(activeFlows2.containsKey(flow2.getExecutionId()));
   }
 
+  @Test
+  public void testFetchActiveFlowByExecId() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+
+    ExecutorLoader loader = createLoader();
+    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    loader.uploadExecutableFlow(flow1);
+    Executor executor = loader.addExecutor("test", 1);
+    loader.assignExecutor(executor.getId(), flow1.getExecutionId());
+    ExecutionReference ref1 = new ExecutionReference(flow1.getExecutionId(), executor);
+    loader.addActiveExecutableReference(ref1);
+
+    Pair<ExecutionReference, ExecutableFlow> activeFlow1 =
+        loader.fetchActiveFlowByExecId(flow1.getExecutionId());
+
+    ExecutionReference execRef1 = activeFlow1.getFirst();
+    ExecutableFlow execFlow1 = activeFlow1.getSecond();
+    Assert.assertNotNull(execRef1);
+    Assert.assertEquals(ref1.getExecId(), execRef1.getExecId());
+    Assert.assertEquals(ref1.getExecutor(), execRef1.getExecutor());
+    Assert.assertNotNull(execFlow1);
+    Assert.assertEquals(flow1.getExecutionId(), execFlow1.getExecutionId());
+    Assert.assertEquals(flow1.getFlowId(), execFlow1.getFlowId());
+    Assert.assertEquals(flow1.getProjectId(), execFlow1.getProjectId());
+    Assert.assertEquals(flow1.getVersion(), execFlow1.getVersion());
+  }
+
   @Ignore @Test
   public void testSmallUploadLog() throws ExecutorManagerException {
     File logDir = new File(UNIT_BASE_DIR + "logtest");
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 2c8a95b..728cb16 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -67,6 +67,12 @@ public class MockExecutorLoader implements ExecutorLoader {
   }
 
   @Override
+  public Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(int execId)
+      throws ExecutorManagerException {
+    return activeFlows.get(execId);
+  }
+
+  @Override
   public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId,
       int skip, int num) throws ExecutorManagerException {
     return null;
diff --git a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/exec2.flow b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/exec2.flow
index 7197af9..d9d2706 100644
--- a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/exec2.flow
+++ b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/exec2.flow
@@ -1,7 +1,7 @@
 {
   "project.id":1,
   "version":2,
-  "id" : "derived-member-data",
+  "id" : "derived-member-data-2",
   "success.email" : [],
   "edges" : [ {
     "source" : "job1",