azkaban-developers

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index cf3a8c7..ab7a561 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -172,7 +172,12 @@ public interface ExecutorLoader {
       throws ExecutorManagerException;
 
   /**
-   * Set executorId for a flow execution
+   * <pre>
+   * Set an executor Id to an execution
+   * Note:-
+   * 1. throws an Exception in case of a SQL issue
+   * 2. throws an Exception in case executionId or executorId do not exist
+   * </pre>
    *
    * @param executorId
    * @param execId
@@ -182,13 +187,33 @@ public interface ExecutorLoader {
     throws ExecutorManagerException;
 
   /**
-   * Fetch executorId for a given flow execution
+   * <pre>
+   * Fetches an executor corresponding to a given execution
+   * Note:-
+   * 1. throws an Exception in case of a SQL issue
+   * 2. return null when no executor is found with the given executionId
+   * </pre>
    *
-   * @param execId
-   * @return
+   * @param executionId
+   * @return fetched Executor
    * @throws ExecutorManagerException
    */
-  public int fetchExecutorId(int execId) throws ExecutorManagerException;
+  public Executor fetchExecutorByExecution(int executionId)
+    throws ExecutorManagerException;
+
+  /**
+   * <pre>
+   * Fetch queued flows which have not yet dispatched
+   * Note:
+   * 1. throws an Exception in case of a SQL issue
+   * 2. return empty list when no queued execution is found
+   * </pre>
+   *
+   * @return List of queued flows and corresponding execution reference
+   * @throws ExecutorManagerException
+   */
+  public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
+    throws ExecutorManagerException;
 
   public boolean updateExecutableReference(int execId, long updateTime)
       throws ExecutorManagerException;
@@ -242,12 +267,4 @@ public interface ExecutorLoader {
 
   public int removeExecutionLogsByTime(long millis)
       throws ExecutorManagerException;
-
-  /**
-   * Fetch queued flows which have not yet dispatched
-   * @return List of queued flows and corresponding execution reference
-   * @throws ExecutorManagerException
-   */
-  public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
-    throws ExecutorManagerException;
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 0bc4f39..af0c7be 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -95,11 +95,10 @@ public class ExecutorManager extends EventHandler implements
   public ExecutorManager(Props props, ExecutorLoader loader,
     Map<String, Alerter> alters) throws ExecutorManagerException {
     this.executorLoader = loader;
+    this.setupExecutors(props);
     this.loadRunningFlows();
     this.loadQueuedFlows();
 
-    setupExecutors(props);
-
     alerters = alters;
 
     cacheDir = new File(props.getString("cache.directory", "cache"));
@@ -129,10 +128,11 @@ public class ExecutorManager extends EventHandler implements
       if (executor == null) {
         executor = executorLoader.addExecutor(executorHost, executorPort);
       } else if (!executor.isActive()) {
-        executorLoader.activateExecutor(executor.getId());
+        executor.setActive(true);
+        executorLoader.updateExecutor(executor);
       }
       activeExecutors.add(new Executor(executor.getId(), executorHost,
-        executorPort));
+        executorPort, true));
     }
 
     if (props.getBoolean("azkaban.multiple.executors", false)) {
@@ -190,6 +190,13 @@ public class ExecutorManager extends EventHandler implements
 
   private void loadRunningFlows() throws ExecutorManagerException {
     runningFlows.putAll(executorLoader.fetchActiveFlows());
+    // Finalize all flows which were running on an executor which is now
+    // inactive
+    for (Pair<ExecutionReference, ExecutableFlow> pair : runningFlows.values()) {
+      if (!activeExecutors.contains(pair.getFirst().getExecutor())) {
+        finalizeFlows(pair.getSecond());
+      }
+    }
   }
 
   /*
@@ -941,169 +948,6 @@ public class ExecutorManager extends EventHandler implements
     executingManager.shutdown();
   }
 
-  private class ExecutingManagerUpdaterThread extends Thread {
-    private boolean shutdown = false;
-
-    public ExecutingManagerUpdaterThread() {
-      this.setName("ExecutorManagerUpdaterThread");
-    }
-
-    // 10 mins recently finished threshold.
-    private long recentlyFinishedLifetimeMs = 600000;
-    private int waitTimeIdleMs = 2000;
-    private int waitTimeMs = 500;
-
-    // When we have an http error, for that flow, we'll check every 10 secs, 6
-    // times (1 mins) before we evict.
-    private int numErrors = 6;
-    private long errorThreshold = 10000;
-
-    private void shutdown() {
-      shutdown = true;
-    }
-
-    @SuppressWarnings("unchecked")
-    public void run() {
-      while (!shutdown) {
-        try {
-          lastThreadCheckTime = System.currentTimeMillis();
-          updaterStage = "Starting update all flows.";
-
-          Map<Executor, List<ExecutableFlow>> exFlowMap =
-            getFlowToExecutorMap();
-          ArrayList<ExecutableFlow> finishedFlows =
-            new ArrayList<ExecutableFlow>();
-          ArrayList<ExecutableFlow> finalizeFlows =
-            new ArrayList<ExecutableFlow>();
-
-          if (exFlowMap.size() > 0) {
-            for (Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
-              .entrySet()) {
-              List<Long> updateTimesList = new ArrayList<Long>();
-              List<Integer> executionIdsList = new ArrayList<Integer>();
-
-              Executor executor = entry.getKey();
-
-              updaterStage =
-                "Starting update flows on " + executor.getHost() + ":"
-                  + executor.getPort();
-
-              // We pack the parameters of the same host together before we
-              // query.
-              fillUpdateTimeAndExecId(entry.getValue(), executionIdsList,
-                updateTimesList);
-
-              Pair<String, String> updateTimes =
-                new Pair<String, String>(
-                  ConnectorParams.UPDATE_TIME_LIST_PARAM,
-                  JSONUtils.toJSON(updateTimesList));
-              Pair<String, String> executionIds =
-                new Pair<String, String>(ConnectorParams.EXEC_ID_LIST_PARAM,
-                  JSONUtils.toJSON(executionIdsList));
-
-              Map<String, Object> results = null;
-              try {
-                results =
-                  callExecutorServer(executor.getHost(), executor.getPort(),
-                    ConnectorParams.UPDATE_ACTION, null, null, executionIds,
-                    updateTimes);
-              } catch (IOException e) {
-                logger.error(e);
-                for (ExecutableFlow flow : entry.getValue()) {
-                  Pair<ExecutionReference, ExecutableFlow> pair =
-                    runningFlows.get(flow.getExecutionId());
-
-                  updaterStage =
-                    "Failed to get update. Doing some clean up for flow "
-                      + pair.getSecond().getExecutionId();
-
-                  if (pair != null) {
-                    ExecutionReference ref = pair.getFirst();
-                    int numErrors = ref.getNumErrors();
-                    if (ref.getNumErrors() < this.numErrors) {
-                      ref.setNextCheckTime(System.currentTimeMillis()
-                        + errorThreshold);
-                      ref.setNumErrors(++numErrors);
-                    } else {
-                      logger.error("Evicting flow " + flow.getExecutionId()
-                        + ". The executor is unresponsive.");
-                      // TODO should send out an unresponsive email here.
-                      finalizeFlows.add(pair.getSecond());
-                    }
-                  }
-                }
-              }
-
-              // We gets results
-              if (results != null) {
-                List<Map<String, Object>> executionUpdates =
-                  (List<Map<String, Object>>) results
-                    .get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
-                for (Map<String, Object> updateMap : executionUpdates) {
-                  try {
-                    ExecutableFlow flow = updateExecution(updateMap);
-
-                    updaterStage = "Updated flow " + flow.getExecutionId();
-
-                    if (isFinished(flow)) {
-                      finishedFlows.add(flow);
-                      finalizeFlows.add(flow);
-                    }
-                  } catch (ExecutorManagerException e) {
-                    ExecutableFlow flow = e.getExecutableFlow();
-                    logger.error(e);
-
-                    if (flow != null) {
-                      logger.error("Finalizing flow " + flow.getExecutionId());
-                      finalizeFlows.add(flow);
-                    }
-                  }
-                }
-              }
-            }
-
-            updaterStage = "Evicting old recently finished flows.";
-
-            evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
-            // Add new finished
-            for (ExecutableFlow flow : finishedFlows) {
-              if (flow.getScheduleId() >= 0
-                && flow.getStatus() == Status.SUCCEEDED) {
-                ScheduleStatisticManager.invalidateCache(flow.getScheduleId(),
-                  cacheDir);
-              }
-              fireEventListeners(Event.create(flow, Type.FLOW_FINISHED));
-              recentlyFinished.put(flow.getExecutionId(), flow);
-            }
-
-            updaterStage =
-              "Finalizing " + finalizeFlows.size() + " error flows.";
-
-            // Kill error flows
-            for (ExecutableFlow flow : finalizeFlows) {
-              finalizeFlows(flow);
-            }
-          }
-
-          updaterStage = "Updated all active flows. Waiting for next round.";
-
-          synchronized (this) {
-            try {
-              if (runningFlows.size() > 0) {
-                this.wait(waitTimeMs);
-              } else {
-                this.wait(waitTimeIdleMs);
-              }
-            } catch (InterruptedException e) {
-            }
-          }
-        } catch (Exception e) {
-          logger.error(e);
-        }
-      }
-    }
-  }
-
   private void finalizeFlows(ExecutableFlow flow) {
 
     int execId = flow.getExecutionId();
@@ -1391,6 +1235,169 @@ public class ExecutorManager extends EventHandler implements
       status);
   }
 
+  private class ExecutingManagerUpdaterThread extends Thread {
+    private boolean shutdown = false;
+
+    public ExecutingManagerUpdaterThread() {
+      this.setName("ExecutorManagerUpdaterThread");
+    }
+
+    // 10 mins recently finished threshold.
+    private long recentlyFinishedLifetimeMs = 600000;
+    private int waitTimeIdleMs = 2000;
+    private int waitTimeMs = 500;
+
+    // When we have an http error, for that flow, we'll check every 10 secs, 6
+    // times (1 mins) before we evict.
+    private int numErrors = 6;
+    private long errorThreshold = 10000;
+
+    private void shutdown() {
+      shutdown = true;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void run() {
+      while (!shutdown) {
+        try {
+          lastThreadCheckTime = System.currentTimeMillis();
+          updaterStage = "Starting update all flows.";
+
+          Map<Executor, List<ExecutableFlow>> exFlowMap =
+            getFlowToExecutorMap();
+          ArrayList<ExecutableFlow> finishedFlows =
+            new ArrayList<ExecutableFlow>();
+          ArrayList<ExecutableFlow> finalizeFlows =
+            new ArrayList<ExecutableFlow>();
+
+          if (exFlowMap.size() > 0) {
+            for (Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
+              .entrySet()) {
+              List<Long> updateTimesList = new ArrayList<Long>();
+              List<Integer> executionIdsList = new ArrayList<Integer>();
+
+              Executor executor = entry.getKey();
+
+              updaterStage =
+                "Starting update flows on " + executor.getHost() + ":"
+                  + executor.getPort();
+
+              // We pack the parameters of the same host together before we
+              // query.
+              fillUpdateTimeAndExecId(entry.getValue(), executionIdsList,
+                updateTimesList);
+
+              Pair<String, String> updateTimes =
+                new Pair<String, String>(
+                  ConnectorParams.UPDATE_TIME_LIST_PARAM,
+                  JSONUtils.toJSON(updateTimesList));
+              Pair<String, String> executionIds =
+                new Pair<String, String>(ConnectorParams.EXEC_ID_LIST_PARAM,
+                  JSONUtils.toJSON(executionIdsList));
+
+              Map<String, Object> results = null;
+              try {
+                results =
+                  callExecutorServer(executor.getHost(), executor.getPort(),
+                    ConnectorParams.UPDATE_ACTION, null, null, executionIds,
+                    updateTimes);
+              } catch (IOException e) {
+                logger.error(e);
+                for (ExecutableFlow flow : entry.getValue()) {
+                  Pair<ExecutionReference, ExecutableFlow> pair =
+                    runningFlows.get(flow.getExecutionId());
+
+                  updaterStage =
+                    "Failed to get update. Doing some clean up for flow "
+                      + pair.getSecond().getExecutionId();
+
+                  if (pair != null) {
+                    ExecutionReference ref = pair.getFirst();
+                    int numErrors = ref.getNumErrors();
+                    if (ref.getNumErrors() < this.numErrors) {
+                      ref.setNextCheckTime(System.currentTimeMillis()
+                        + errorThreshold);
+                      ref.setNumErrors(++numErrors);
+                    } else {
+                      logger.error("Evicting flow " + flow.getExecutionId()
+                        + ". The executor is unresponsive.");
+                      // TODO should send out an unresponsive email here.
+                      finalizeFlows.add(pair.getSecond());
+                    }
+                  }
+                }
+              }
+
+              // We gets results
+              if (results != null) {
+                List<Map<String, Object>> executionUpdates =
+                  (List<Map<String, Object>>) results
+                    .get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
+                for (Map<String, Object> updateMap : executionUpdates) {
+                  try {
+                    ExecutableFlow flow = updateExecution(updateMap);
+
+                    updaterStage = "Updated flow " + flow.getExecutionId();
+
+                    if (isFinished(flow)) {
+                      finishedFlows.add(flow);
+                      finalizeFlows.add(flow);
+                    }
+                  } catch (ExecutorManagerException e) {
+                    ExecutableFlow flow = e.getExecutableFlow();
+                    logger.error(e);
+
+                    if (flow != null) {
+                      logger.error("Finalizing flow " + flow.getExecutionId());
+                      finalizeFlows.add(flow);
+                    }
+                  }
+                }
+              }
+            }
+
+            updaterStage = "Evicting old recently finished flows.";
+
+            evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
+            // Add new finished
+            for (ExecutableFlow flow : finishedFlows) {
+              if (flow.getScheduleId() >= 0
+                && flow.getStatus() == Status.SUCCEEDED) {
+                ScheduleStatisticManager.invalidateCache(flow.getScheduleId(),
+                  cacheDir);
+              }
+              fireEventListeners(Event.create(flow, Type.FLOW_FINISHED));
+              recentlyFinished.put(flow.getExecutionId(), flow);
+            }
+
+            updaterStage =
+              "Finalizing " + finalizeFlows.size() + " error flows.";
+
+            // Kill error flows
+            for (ExecutableFlow flow : finalizeFlows) {
+              finalizeFlows(flow);
+            }
+          }
+
+          updaterStage = "Updated all active flows. Waiting for next round.";
+
+          synchronized (this) {
+            try {
+              if (runningFlows.size() > 0) {
+                this.wait(waitTimeMs);
+              } else {
+                this.wait(waitTimeIdleMs);
+              }
+            } catch (InterruptedException e) {
+            }
+          }
+        } catch (Exception e) {
+          logger.error(e);
+        }
+      }
+    }
+  }
+
   /*
    * This thread is responsible for processing queued flows.
    */
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index f148499..3d14a17 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -996,6 +996,65 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     return events;
   }
 
+  /**
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#assignExecutor(int, int)
+   */
+  @Override
+  public void assignExecutor(int executorId, int executionId)
+    throws ExecutorManagerException {
+    final String UPDATE =
+      "UPDATE execution_flows SET executor_id=? where exec_id=?";
+
+    QueryRunner runner = createQueryRunner();
+    try {
+      Executor executor = fetchExecutor(executorId);
+      if (executor == null) {
+        throw new ExecutorManagerException(String.format(
+          "Failed to assign non-existent executor Id: %d to execution : %d  ",
+          executorId, executionId));
+      }
+
+      int rows = runner.update(UPDATE, executorId, executionId);
+      if (rows == 0) {
+        throw new ExecutorManagerException(String.format(
+          "Failed to assign executor Id: %d to non-existent execution : %d  ",
+          executorId, executionId));
+      }
+    } catch (SQLException e) {
+      throw new ExecutorManagerException("Error updating executor id "
+        + executorId, e);
+    }
+  }
+
+  /**
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#fetchExecutorByExecution(int)
+   */
+  @Override
+  public Executor fetchExecutorByExecution(int executionId)
+    throws ExecutorManagerException {
+    QueryRunner runner = createQueryRunner();
+    FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+    Executor executor = null;
+    try {
+      List<Executor> executors =
+        runner.query(FetchExecutorHandler.FETCH_EXECUTION_EXECUTOR,
+          executorHandler, executionId);
+      if (executors.size() > 0) {
+        executor = executors.get(0);
+      }
+    } catch (SQLException e) {
+      throw new ExecutorManagerException(
+        "Error fetching executor for exec_id : " + executionId, e);
+    }
+    return executor;
+  }
+
   private static class LastInsertID implements ResultSetHandler<Long> {
     private static String LAST_INSERT_ID = "SELECT LAST_INSERT_ID()";
 
@@ -1198,6 +1257,9 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     }
   }
 
+  /**
+   * JDBC ResultSetHandler to fetch queued executions
+   */
   private static class FetchQueuedExecutableFlows implements
     ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
     // Select queued unassigned flows
@@ -1262,8 +1324,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
     // Select running and executor assigned flows
     private static String FETCH_ACTIVE_EXECUTABLE_FLOW =
-      "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, "
-        + "et.host host, et.port port, ax.update_time axUpdateTime, et.id executorId"
+      "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
+        + "et.port port, ax.update_time axUpdateTime, et.id executorId, et.active executorStatus"
         + " FROM execution_flows ex"
         + " INNER JOIN "
         + " active_executing_flows ax ON ex.exec_id = ax.exec_id"
@@ -1288,6 +1350,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
         int port = rs.getInt(5);
         long updateTime = rs.getLong(6);
         int executorId = rs.getInt(7);
+        boolean executorStatus = rs.getBoolean(8);
 
         if (data == null) {
           execFlows.put(id, null);
@@ -1308,7 +1371,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
 
             ExecutableFlow exFlow =
                 ExecutableFlow.createExecutableFlowFromObject(flowObj);
-            Executor executor = new Executor(executorId, host, port);
+            Executor executor = new Executor(executorId, host, port, executorStatus);
             ExecutionReference ref = new ExecutionReference(id, executor);
             ref.setUpdateTime(updateTime);
 
@@ -1439,6 +1502,10 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       "SELECT id, host, port, active FROM executors where id=?";
     private static String FETCH_EXECUTOR_BY_HOST_PORT =
       "SELECT id, host, port, active FROM executors where host=? AND port=?";
+    private static String FETCH_EXECUTION_EXECUTOR =
+      "SELECT ex.id, ex.host, ex.port, ex.active FROM "
+        + " executors ex INNER JOIN execution_flows ef "
+        + "on ex.id = ef.executor_id  where exec_id=?";
 
     @Override
     public List<Executor> handle(ResultSet rs) throws SQLException {
@@ -1492,38 +1559,4 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       return events;
     }
   }
-
-  @Override
-  public void assignExecutor(int executorId, int execId)
-    throws ExecutorManagerException {
-    final String UPDATE =
-      "UPDATE execution_flows SET executor_id=? where exec_id=?";
-
-    QueryRunner runner = createQueryRunner();
-    try {
-      int rows = runner.update(UPDATE, executorId, execId);
-      if (rows == 0) {
-        throw new ExecutorManagerException(String.format(
-          "Failed to update executor Id: %d to execution : %d  ", executorId,
-          execId));
-      }
-    } catch (SQLException e) {
-      throw new ExecutorManagerException("Error updating executor id "
-        + executorId, e);
-    }
-  }
-
-  @Override
-  public int fetchExecutorId(int executionId) throws ExecutorManagerException {
-    QueryRunner runner = createQueryRunner();
-    IntHandler intHandler = new IntHandler();
-    try {
-      int executorId =
-        runner.query(IntHandler.FETCH_EXECUTOR_ID, intHandler, executionId);
-      return executorId;
-    } catch (SQLException e) {
-      throw new ExecutorManagerException(
-        "Error fetching executorId for exec_id : " + executionId, e);
-    }
-  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index 8cfe03d..ed75086 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -24,6 +24,7 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -33,6 +34,7 @@ import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
 import org.joda.time.DateTime;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -52,14 +54,16 @@ import azkaban.utils.Props;
 public class JdbcExecutorLoaderTest {
   private static boolean testDBExists;
   // @TODO remove this and turn into local host.
-  private static final String host = "cyu-ld.linkedin.biz";
+  private static final String host = "localhost";
   private static final int port = 3306;
   private static final String database = "azkaban2";
   private static final String user = "azkaban";
   private static final String password = "azkaban";
   private static final int numConnections = 10;
+  private static final String UNIT_BASE_DIR =
+    "../azkaban-test/src/test/resources/executions/";
 
-  private File flowDir = new File("unit/executions/exectest1");
+  private File flowDir = new File(UNIT_BASE_DIR + "/exectest1");
 
   @BeforeClass
   public static void setupDB() {
@@ -143,8 +147,8 @@ public class JdbcExecutorLoaderTest {
     DbUtils.closeQuietly(connection);
   }
 
-  @AfterClass
-  public static void clearDB() {
+  @After
+  public void clearDB() {
     if (!testDBExists) {
       return;
     }
@@ -226,8 +230,6 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
-    Assert.assertEquals(1, 0);
-
     ExecutorLoader loader = createLoader();
     ExecutableFlow flow = createExecutableFlow("exec1");
 
@@ -334,6 +336,149 @@ public class JdbcExecutorLoaderTest {
 
   }
 
+  /* Test exception when assigning a non-existent executor to a flow */
+  @Test
+  public void testAssignExecutorInvalidExecutor()
+    throws ExecutorManagerException, IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    ExecutableFlow flow = createExecutableFlow("exec1");
+    loader.uploadExecutableFlow(flow);
+    try {
+      loader.assignExecutor(flow.getExecutionId(), 1);
+      Assert.fail("Expecting exception, but didn't get one");
+    } catch (ExecutorManagerException ex) {
+      System.out.println("Test true");
+    }
+  }
+
+  /* Test exception when assigning an executor to a non-existent flow execution */
+  @Test
+  public void testAssignExecutorInvalidExecution()
+    throws ExecutorManagerException, IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    String host = "localhost";
+    int port = 12345;
+    Executor executor = loader.addExecutor(host, port);
+    try {
+      loader.assignExecutor(2, executor.getId());
+      Assert.fail("Expecting exception, but didn't get one");
+    } catch (ExecutorManagerException ex) {
+      System.out.println("Test true");
+    }
+  }
+
+  /* Test null return when an invalid execution flows */
+  @Test
+  public void testFetchMissingExecutorByExecution()
+    throws ExecutorManagerException, IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    Assert.assertEquals(loader.fetchExecutorByExecution(1), null);
+  }
+
+  /* Test null return when for a non-dispatched execution */
+  @Test
+  public void testFetchExecutorByQueuedExecution()
+    throws ExecutorManagerException, IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    ExecutableFlow flow = createExecutableFlow("exec1");
+    loader.uploadExecutableFlow(flow);
+    Assert.assertEquals(loader.fetchExecutorByExecution(flow.getExecutionId()),
+      null);
+  }
+
+  /* Test happy case when assigning and fetching an executor to a flow execution */
+  @Test
+  public void testAssignAndFetchExecutor() throws ExecutorManagerException,
+    IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    String host = "localhost";
+    int port = 12345;
+    Executor executor = loader.addExecutor(host, port);
+    ExecutableFlow flow = createExecutableFlow("exec1");
+    loader.uploadExecutableFlow(flow);
+    loader.assignExecutor(executor.getId(), flow.getExecutionId());
+    Assert.assertEquals(loader.fetchExecutorByExecution(flow.getExecutionId()),
+      executor);
+  }
+
+  /* Test fetchQueuedFlows when there are no queued flows */
+  @Test
+  public void testFetchNoQueuedFlows() throws ExecutorManagerException,
+    IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+
+    ExecutorLoader loader = createLoader();
+    List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+      loader.fetchQueuedFlows();
+
+    // no execution flows at all i.e. no running, completed or queued flows
+    Assert.assertTrue(queuedFlows.isEmpty());
+
+    String host = "lcoalhost";
+    int port = 12345;
+    Executor executor = loader.addExecutor(host, port);
+
+    ExecutableFlow flow = createExecutableFlow("exec1");
+    loader.uploadExecutableFlow(flow);
+    loader.assignExecutor(executor.getId(), flow.getExecutionId());
+    // only completed flows
+    Assert.assertTrue(queuedFlows.isEmpty());
+
+    ExecutableFlow flow2 = createExecutableFlow("exec2");
+    loader.uploadExecutableFlow(flow);
+    loader.assignExecutor(executor.getId(), flow.getExecutionId());
+    ExecutionReference ref = new ExecutionReference(flow2.getExecutionId());
+    loader.addActiveExecutableReference(ref);
+    // only running and completed flows
+    Assert.assertTrue(queuedFlows.isEmpty());
+  }
+
+  /* Test fetchQueuedFlows happy case */
+  @Test
+  public void testFetchQueuedFlows() throws ExecutorManagerException,
+    IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+
+    ExecutorLoader loader = createLoader();
+    List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+      new LinkedList<Pair<ExecutionReference, ExecutableFlow>>();
+
+    ExecutableFlow flow = createExecutableFlow("exec1");
+    loader.uploadExecutableFlow(flow);
+    ExecutableFlow flow2 = createExecutableFlow("exec2");
+    loader.uploadExecutableFlow(flow);
+
+    ExecutionReference ref2 = new ExecutionReference(flow2.getExecutionId());
+    loader.addActiveExecutableReference(ref2);
+    ExecutionReference ref = new ExecutionReference(flow.getExecutionId());
+    loader.addActiveExecutableReference(ref);
+
+    queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref, flow));
+    queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref2, flow2));
+
+    // only running and completed flows
+    Assert.assertArrayEquals(loader.fetchQueuedFlows().toArray(),
+      queuedFlows.toArray());
+  }
 
   /* Test all executors fetch from empty executors */
   @Test
@@ -435,7 +580,7 @@ public class JdbcExecutorLoaderTest {
     ExecutorLoader loader = createLoader();
     try {
       String host = "localhost";
-      int port = 123456;
+      int port = 12345;
       loader.addExecutor(host, port);
       loader.addExecutor(host, port);
       Assert.fail("Expecting exception, but didn't get one");
@@ -599,7 +744,7 @@ public class JdbcExecutorLoaderTest {
     ExecutorLoader loader = createLoader();
     ExecutableFlow flow1 = createExecutableFlow("exec1");
     loader.uploadExecutableFlow(flow1);
-    Executor executor = new Executor(2, "test", 1);
+    Executor executor = new Executor(2, "test", 1, true);
     ExecutionReference ref1 =
         new ExecutionReference(flow1.getExecutionId(), executor);
     loader.addActiveExecutableReference(ref1);
@@ -654,7 +799,7 @@ public class JdbcExecutorLoaderTest {
 
   @Ignore @Test
   public void testSmallUploadLog() throws ExecutorManagerException {
-    File logDir = new File("unit/executions/logtest");
+    File logDir = new File(UNIT_BASE_DIR + "logtest");
     File[] smalllog =
         { new File(logDir, "log1.log"), new File(logDir, "log2.log"),
             new File(logDir, "log3.log") };
@@ -679,7 +824,7 @@ public class JdbcExecutorLoaderTest {
 
   @Ignore @Test
   public void testLargeUploadLog() throws ExecutorManagerException {
-    File logDir = new File("unit/executions/logtest");
+    File logDir = new File(UNIT_BASE_DIR + "logtest");
 
     // Multiple of 255 for Henry the Eigth
     File[] largelog =
@@ -725,7 +870,7 @@ public class JdbcExecutorLoaderTest {
 
     ExecutorLoader loader = createLoader();
 
-    File logDir = new File("unit/executions/logtest");
+    File logDir = new File(UNIT_BASE_DIR + "logtest");
 
     // Multiple of 255 for Henry the Eigth
     File[] largelog =
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index fbfe625..6969a20 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -345,9 +345,9 @@ public class MockExecutorLoader implements ExecutorLoader {
   }
 
   @Override
-  public int fetchExecutorId(int execId) throws ExecutorManagerException {
+  public Executor fetchExecutorByExecution(int execId) throws ExecutorManagerException {
     if (executionExecutorMapping.containsKey(execId)) {
-      return executionExecutorMapping.get(execId);
+      return fetchExecutor(executionExecutorMapping.get(execId));
     } else {
       throw new ExecutorManagerException(
         "Failed to find executor with execution : " + execId);