Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index cf3a8c7..ab7a561 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -172,7 +172,12 @@ public interface ExecutorLoader {
throws ExecutorManagerException;
/**
- * Set executorId for a flow execution
+ * <pre>
+ * Set an executor Id to an execution
+ * Note:-
+ * 1. throws an Exception in case of a SQL issue
+ * 2. throws an Exception in case executionId or executorId do not exist
+ * </pre>
*
* @param executorId
* @param execId
@@ -182,13 +187,33 @@ public interface ExecutorLoader {
throws ExecutorManagerException;
/**
- * Fetch executorId for a given flow execution
+ * <pre>
+ * Fetches an executor corresponding to a given execution
+ * Note:-
+ * 1. throws an Exception in case of a SQL issue
+ * 2. return null when no executor is found with the given executionId
+ * </pre>
*
- * @param execId
- * @return
+ * @param executionId
+ * @return fetched Executor
* @throws ExecutorManagerException
*/
- public int fetchExecutorId(int execId) throws ExecutorManagerException;
+ public Executor fetchExecutorByExecution(int executionId)
+ throws ExecutorManagerException;
+
+ /**
+ * <pre>
+ * Fetch queued flows which have not yet dispatched
+ * Note:
+ * 1. throws an Exception in case of a SQL issue
+ * 2. return empty list when no queued execution is found
+ * </pre>
+ *
+ * @return List of queued flows and corresponding execution reference
+ * @throws ExecutorManagerException
+ */
+ public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
+ throws ExecutorManagerException;
public boolean updateExecutableReference(int execId, long updateTime)
throws ExecutorManagerException;
@@ -242,12 +267,4 @@ public interface ExecutorLoader {
public int removeExecutionLogsByTime(long millis)
throws ExecutorManagerException;
-
- /**
- * Fetch queued flows which have not yet dispatched
- * @return List of queued flows and corresponding execution reference
- * @throws ExecutorManagerException
- */
- public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
- throws ExecutorManagerException;
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 0bc4f39..af0c7be 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -95,11 +95,10 @@ public class ExecutorManager extends EventHandler implements
public ExecutorManager(Props props, ExecutorLoader loader,
Map<String, Alerter> alters) throws ExecutorManagerException {
this.executorLoader = loader;
+ this.setupExecutors(props);
this.loadRunningFlows();
this.loadQueuedFlows();
- setupExecutors(props);
-
alerters = alters;
cacheDir = new File(props.getString("cache.directory", "cache"));
@@ -129,10 +128,11 @@ public class ExecutorManager extends EventHandler implements
if (executor == null) {
executor = executorLoader.addExecutor(executorHost, executorPort);
} else if (!executor.isActive()) {
- executorLoader.activateExecutor(executor.getId());
+ executor.setActive(true);
+ executorLoader.updateExecutor(executor);
}
activeExecutors.add(new Executor(executor.getId(), executorHost,
- executorPort));
+ executorPort, true));
}
if (props.getBoolean("azkaban.multiple.executors", false)) {
@@ -190,6 +190,13 @@ public class ExecutorManager extends EventHandler implements
private void loadRunningFlows() throws ExecutorManagerException {
runningFlows.putAll(executorLoader.fetchActiveFlows());
+ // Finalize all flows which were running on an executor which is now
+ // inactive
+ for (Pair<ExecutionReference, ExecutableFlow> pair : runningFlows.values()) {
+ if (!activeExecutors.contains(pair.getFirst().getExecutor())) {
+ finalizeFlows(pair.getSecond());
+ }
+ }
}
/*
@@ -941,169 +948,6 @@ public class ExecutorManager extends EventHandler implements
executingManager.shutdown();
}
- private class ExecutingManagerUpdaterThread extends Thread {
- private boolean shutdown = false;
-
- public ExecutingManagerUpdaterThread() {
- this.setName("ExecutorManagerUpdaterThread");
- }
-
- // 10 mins recently finished threshold.
- private long recentlyFinishedLifetimeMs = 600000;
- private int waitTimeIdleMs = 2000;
- private int waitTimeMs = 500;
-
- // When we have an http error, for that flow, we'll check every 10 secs, 6
- // times (1 mins) before we evict.
- private int numErrors = 6;
- private long errorThreshold = 10000;
-
- private void shutdown() {
- shutdown = true;
- }
-
- @SuppressWarnings("unchecked")
- public void run() {
- while (!shutdown) {
- try {
- lastThreadCheckTime = System.currentTimeMillis();
- updaterStage = "Starting update all flows.";
-
- Map<Executor, List<ExecutableFlow>> exFlowMap =
- getFlowToExecutorMap();
- ArrayList<ExecutableFlow> finishedFlows =
- new ArrayList<ExecutableFlow>();
- ArrayList<ExecutableFlow> finalizeFlows =
- new ArrayList<ExecutableFlow>();
-
- if (exFlowMap.size() > 0) {
- for (Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
- .entrySet()) {
- List<Long> updateTimesList = new ArrayList<Long>();
- List<Integer> executionIdsList = new ArrayList<Integer>();
-
- Executor executor = entry.getKey();
-
- updaterStage =
- "Starting update flows on " + executor.getHost() + ":"
- + executor.getPort();
-
- // We pack the parameters of the same host together before we
- // query.
- fillUpdateTimeAndExecId(entry.getValue(), executionIdsList,
- updateTimesList);
-
- Pair<String, String> updateTimes =
- new Pair<String, String>(
- ConnectorParams.UPDATE_TIME_LIST_PARAM,
- JSONUtils.toJSON(updateTimesList));
- Pair<String, String> executionIds =
- new Pair<String, String>(ConnectorParams.EXEC_ID_LIST_PARAM,
- JSONUtils.toJSON(executionIdsList));
-
- Map<String, Object> results = null;
- try {
- results =
- callExecutorServer(executor.getHost(), executor.getPort(),
- ConnectorParams.UPDATE_ACTION, null, null, executionIds,
- updateTimes);
- } catch (IOException e) {
- logger.error(e);
- for (ExecutableFlow flow : entry.getValue()) {
- Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(flow.getExecutionId());
-
- updaterStage =
- "Failed to get update. Doing some clean up for flow "
- + pair.getSecond().getExecutionId();
-
- if (pair != null) {
- ExecutionReference ref = pair.getFirst();
- int numErrors = ref.getNumErrors();
- if (ref.getNumErrors() < this.numErrors) {
- ref.setNextCheckTime(System.currentTimeMillis()
- + errorThreshold);
- ref.setNumErrors(++numErrors);
- } else {
- logger.error("Evicting flow " + flow.getExecutionId()
- + ". The executor is unresponsive.");
- // TODO should send out an unresponsive email here.
- finalizeFlows.add(pair.getSecond());
- }
- }
- }
- }
-
- // We gets results
- if (results != null) {
- List<Map<String, Object>> executionUpdates =
- (List<Map<String, Object>>) results
- .get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
- for (Map<String, Object> updateMap : executionUpdates) {
- try {
- ExecutableFlow flow = updateExecution(updateMap);
-
- updaterStage = "Updated flow " + flow.getExecutionId();
-
- if (isFinished(flow)) {
- finishedFlows.add(flow);
- finalizeFlows.add(flow);
- }
- } catch (ExecutorManagerException e) {
- ExecutableFlow flow = e.getExecutableFlow();
- logger.error(e);
-
- if (flow != null) {
- logger.error("Finalizing flow " + flow.getExecutionId());
- finalizeFlows.add(flow);
- }
- }
- }
- }
- }
-
- updaterStage = "Evicting old recently finished flows.";
-
- evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
- // Add new finished
- for (ExecutableFlow flow : finishedFlows) {
- if (flow.getScheduleId() >= 0
- && flow.getStatus() == Status.SUCCEEDED) {
- ScheduleStatisticManager.invalidateCache(flow.getScheduleId(),
- cacheDir);
- }
- fireEventListeners(Event.create(flow, Type.FLOW_FINISHED));
- recentlyFinished.put(flow.getExecutionId(), flow);
- }
-
- updaterStage =
- "Finalizing " + finalizeFlows.size() + " error flows.";
-
- // Kill error flows
- for (ExecutableFlow flow : finalizeFlows) {
- finalizeFlows(flow);
- }
- }
-
- updaterStage = "Updated all active flows. Waiting for next round.";
-
- synchronized (this) {
- try {
- if (runningFlows.size() > 0) {
- this.wait(waitTimeMs);
- } else {
- this.wait(waitTimeIdleMs);
- }
- } catch (InterruptedException e) {
- }
- }
- } catch (Exception e) {
- logger.error(e);
- }
- }
- }
- }
-
private void finalizeFlows(ExecutableFlow flow) {
int execId = flow.getExecutionId();
@@ -1391,6 +1235,169 @@ public class ExecutorManager extends EventHandler implements
status);
}
+ private class ExecutingManagerUpdaterThread extends Thread {
+ private boolean shutdown = false;
+
+ public ExecutingManagerUpdaterThread() {
+ this.setName("ExecutorManagerUpdaterThread");
+ }
+
+ // 10 mins recently finished threshold.
+ private long recentlyFinishedLifetimeMs = 600000;
+ private int waitTimeIdleMs = 2000;
+ private int waitTimeMs = 500;
+
+ // When we have an http error, for that flow, we'll check every 10 secs, 6
+ // times (1 mins) before we evict.
+ private int numErrors = 6;
+ private long errorThreshold = 10000;
+
+ private void shutdown() {
+ shutdown = true;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void run() {
+ while (!shutdown) {
+ try {
+ lastThreadCheckTime = System.currentTimeMillis();
+ updaterStage = "Starting update all flows.";
+
+ Map<Executor, List<ExecutableFlow>> exFlowMap =
+ getFlowToExecutorMap();
+ ArrayList<ExecutableFlow> finishedFlows =
+ new ArrayList<ExecutableFlow>();
+ ArrayList<ExecutableFlow> finalizeFlows =
+ new ArrayList<ExecutableFlow>();
+
+ if (exFlowMap.size() > 0) {
+ for (Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
+ .entrySet()) {
+ List<Long> updateTimesList = new ArrayList<Long>();
+ List<Integer> executionIdsList = new ArrayList<Integer>();
+
+ Executor executor = entry.getKey();
+
+ updaterStage =
+ "Starting update flows on " + executor.getHost() + ":"
+ + executor.getPort();
+
+ // We pack the parameters of the same host together before we
+ // query.
+ fillUpdateTimeAndExecId(entry.getValue(), executionIdsList,
+ updateTimesList);
+
+ Pair<String, String> updateTimes =
+ new Pair<String, String>(
+ ConnectorParams.UPDATE_TIME_LIST_PARAM,
+ JSONUtils.toJSON(updateTimesList));
+ Pair<String, String> executionIds =
+ new Pair<String, String>(ConnectorParams.EXEC_ID_LIST_PARAM,
+ JSONUtils.toJSON(executionIdsList));
+
+ Map<String, Object> results = null;
+ try {
+ results =
+ callExecutorServer(executor.getHost(), executor.getPort(),
+ ConnectorParams.UPDATE_ACTION, null, null, executionIds,
+ updateTimes);
+ } catch (IOException e) {
+ logger.error(e);
+ for (ExecutableFlow flow : entry.getValue()) {
+ Pair<ExecutionReference, ExecutableFlow> pair =
+ runningFlows.get(flow.getExecutionId());
+
+ updaterStage =
+ "Failed to get update. Doing some clean up for flow "
+ + pair.getSecond().getExecutionId();
+
+ if (pair != null) {
+ ExecutionReference ref = pair.getFirst();
+ int numErrors = ref.getNumErrors();
+ if (ref.getNumErrors() < this.numErrors) {
+ ref.setNextCheckTime(System.currentTimeMillis()
+ + errorThreshold);
+ ref.setNumErrors(++numErrors);
+ } else {
+ logger.error("Evicting flow " + flow.getExecutionId()
+ + ". The executor is unresponsive.");
+ // TODO should send out an unresponsive email here.
+ finalizeFlows.add(pair.getSecond());
+ }
+ }
+ }
+ }
+
+ // We gets results
+ if (results != null) {
+ List<Map<String, Object>> executionUpdates =
+ (List<Map<String, Object>>) results
+ .get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
+ for (Map<String, Object> updateMap : executionUpdates) {
+ try {
+ ExecutableFlow flow = updateExecution(updateMap);
+
+ updaterStage = "Updated flow " + flow.getExecutionId();
+
+ if (isFinished(flow)) {
+ finishedFlows.add(flow);
+ finalizeFlows.add(flow);
+ }
+ } catch (ExecutorManagerException e) {
+ ExecutableFlow flow = e.getExecutableFlow();
+ logger.error(e);
+
+ if (flow != null) {
+ logger.error("Finalizing flow " + flow.getExecutionId());
+ finalizeFlows.add(flow);
+ }
+ }
+ }
+ }
+ }
+
+ updaterStage = "Evicting old recently finished flows.";
+
+ evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
+ // Add new finished
+ for (ExecutableFlow flow : finishedFlows) {
+ if (flow.getScheduleId() >= 0
+ && flow.getStatus() == Status.SUCCEEDED) {
+ ScheduleStatisticManager.invalidateCache(flow.getScheduleId(),
+ cacheDir);
+ }
+ fireEventListeners(Event.create(flow, Type.FLOW_FINISHED));
+ recentlyFinished.put(flow.getExecutionId(), flow);
+ }
+
+ updaterStage =
+ "Finalizing " + finalizeFlows.size() + " error flows.";
+
+ // Kill error flows
+ for (ExecutableFlow flow : finalizeFlows) {
+ finalizeFlows(flow);
+ }
+ }
+
+ updaterStage = "Updated all active flows. Waiting for next round.";
+
+ synchronized (this) {
+ try {
+ if (runningFlows.size() > 0) {
+ this.wait(waitTimeMs);
+ } else {
+ this.wait(waitTimeIdleMs);
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+ } catch (Exception e) {
+ logger.error(e);
+ }
+ }
+ }
+ }
+
/*
* This thread is responsible for processing queued flows.
*/
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index f148499..3d14a17 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -996,6 +996,65 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
return events;
}
+ /**
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorLoader#assignExecutor(int, int)
+ */
+ @Override
+ public void assignExecutor(int executorId, int executionId)
+ throws ExecutorManagerException {
+ final String UPDATE =
+ "UPDATE execution_flows SET executor_id=? where exec_id=?";
+
+ QueryRunner runner = createQueryRunner();
+ try {
+ Executor executor = fetchExecutor(executorId);
+ if (executor == null) {
+ throw new ExecutorManagerException(String.format(
+ "Failed to assign non-existent executor Id: %d to execution : %d ",
+ executorId, executionId));
+ }
+
+ int rows = runner.update(UPDATE, executorId, executionId);
+ if (rows == 0) {
+ throw new ExecutorManagerException(String.format(
+ "Failed to assign executor Id: %d to non-existent execution : %d ",
+ executorId, executionId));
+ }
+ } catch (SQLException e) {
+ throw new ExecutorManagerException("Error updating executor id "
+ + executorId, e);
+ }
+ }
+
+ /**
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorLoader#fetchExecutorByExecution(int)
+ */
+ @Override
+ public Executor fetchExecutorByExecution(int executionId)
+ throws ExecutorManagerException {
+ QueryRunner runner = createQueryRunner();
+ FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+ Executor executor = null;
+ try {
+ List<Executor> executors =
+ runner.query(FetchExecutorHandler.FETCH_EXECUTION_EXECUTOR,
+ executorHandler, executionId);
+ if (executors.size() > 0) {
+ executor = executors.get(0);
+ }
+ } catch (SQLException e) {
+ throw new ExecutorManagerException(
+ "Error fetching executor for exec_id : " + executionId, e);
+ }
+ return executor;
+ }
+
private static class LastInsertID implements ResultSetHandler<Long> {
private static String LAST_INSERT_ID = "SELECT LAST_INSERT_ID()";
@@ -1198,6 +1257,9 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
}
+ /**
+ * JDBC ResultSetHandler to fetch queued executions
+ */
private static class FetchQueuedExecutableFlows implements
ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
// Select queued unassigned flows
@@ -1262,8 +1324,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
// Select running and executor assigned flows
private static String FETCH_ACTIVE_EXECUTABLE_FLOW =
- "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, "
- + "et.host host, et.port port, ax.update_time axUpdateTime, et.id executorId"
+ "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
+ + "et.port port, ax.update_time axUpdateTime, et.id executorId, et.active executorStatus"
+ " FROM execution_flows ex"
+ " INNER JOIN "
+ " active_executing_flows ax ON ex.exec_id = ax.exec_id"
@@ -1288,6 +1350,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
int port = rs.getInt(5);
long updateTime = rs.getLong(6);
int executorId = rs.getInt(7);
+ boolean executorStatus = rs.getBoolean(8);
if (data == null) {
execFlows.put(id, null);
@@ -1308,7 +1371,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
ExecutableFlow exFlow =
ExecutableFlow.createExecutableFlowFromObject(flowObj);
- Executor executor = new Executor(executorId, host, port);
+ Executor executor = new Executor(executorId, host, port, executorStatus);
ExecutionReference ref = new ExecutionReference(id, executor);
ref.setUpdateTime(updateTime);
@@ -1439,6 +1502,10 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
"SELECT id, host, port, active FROM executors where id=?";
private static String FETCH_EXECUTOR_BY_HOST_PORT =
"SELECT id, host, port, active FROM executors where host=? AND port=?";
+ private static String FETCH_EXECUTION_EXECUTOR =
+ "SELECT ex.id, ex.host, ex.port, ex.active FROM "
+ + " executors ex INNER JOIN execution_flows ef "
+ + "on ex.id = ef.executor_id where exec_id=?";
@Override
public List<Executor> handle(ResultSet rs) throws SQLException {
@@ -1492,38 +1559,4 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
return events;
}
}
-
- @Override
- public void assignExecutor(int executorId, int execId)
- throws ExecutorManagerException {
- final String UPDATE =
- "UPDATE execution_flows SET executor_id=? where exec_id=?";
-
- QueryRunner runner = createQueryRunner();
- try {
- int rows = runner.update(UPDATE, executorId, execId);
- if (rows == 0) {
- throw new ExecutorManagerException(String.format(
- "Failed to update executor Id: %d to execution : %d ", executorId,
- execId));
- }
- } catch (SQLException e) {
- throw new ExecutorManagerException("Error updating executor id "
- + executorId, e);
- }
- }
-
- @Override
- public int fetchExecutorId(int executionId) throws ExecutorManagerException {
- QueryRunner runner = createQueryRunner();
- IntHandler intHandler = new IntHandler();
- try {
- int executorId =
- runner.query(IntHandler.FETCH_EXECUTOR_ID, intHandler, executionId);
- return executorId;
- } catch (SQLException e) {
- throw new ExecutorManagerException(
- "Error fetching executorId for exec_id : " + executionId, e);
- }
- }
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index 8cfe03d..ed75086 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -24,6 +24,7 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -33,6 +34,7 @@ import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.joda.time.DateTime;
+import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -52,14 +54,16 @@ import azkaban.utils.Props;
public class JdbcExecutorLoaderTest {
private static boolean testDBExists;
// @TODO remove this and turn into local host.
- private static final String host = "cyu-ld.linkedin.biz";
+ private static final String host = "localhost";
private static final int port = 3306;
private static final String database = "azkaban2";
private static final String user = "azkaban";
private static final String password = "azkaban";
private static final int numConnections = 10;
+ private static final String UNIT_BASE_DIR =
+ "../azkaban-test/src/test/resources/executions/";
- private File flowDir = new File("unit/executions/exectest1");
+ private File flowDir = new File(UNIT_BASE_DIR + "/exectest1");
@BeforeClass
public static void setupDB() {
@@ -143,8 +147,8 @@ public class JdbcExecutorLoaderTest {
DbUtils.closeQuietly(connection);
}
- @AfterClass
- public static void clearDB() {
+ @After
+ public void clearDB() {
if (!testDBExists) {
return;
}
@@ -226,8 +230,6 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
- Assert.assertEquals(1, 0);
-
ExecutorLoader loader = createLoader();
ExecutableFlow flow = createExecutableFlow("exec1");
@@ -334,6 +336,149 @@ public class JdbcExecutorLoaderTest {
}
+ /* Test exception when assigning a non-existent executor to a flow */
+ @Test
+ public void testAssignExecutorInvalidExecutor()
+ throws ExecutorManagerException, IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ ExecutableFlow flow = createExecutableFlow("exec1");
+ loader.uploadExecutableFlow(flow);
+ try {
+ loader.assignExecutor(flow.getExecutionId(), 1);
+ Assert.fail("Expecting exception, but didn't get one");
+ } catch (ExecutorManagerException ex) {
+ System.out.println("Test true");
+ }
+ }
+
+ /* Test exception when assigning an executor to a non-existent flow execution */
+ @Test
+ public void testAssignExecutorInvalidExecution()
+ throws ExecutorManagerException, IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ String host = "localhost";
+ int port = 12345;
+ Executor executor = loader.addExecutor(host, port);
+ try {
+ loader.assignExecutor(2, executor.getId());
+ Assert.fail("Expecting exception, but didn't get one");
+ } catch (ExecutorManagerException ex) {
+ System.out.println("Test true");
+ }
+ }
+
+ /* Test null return when an invalid execution flows */
+ @Test
+ public void testFetchMissingExecutorByExecution()
+ throws ExecutorManagerException, IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ Assert.assertEquals(loader.fetchExecutorByExecution(1), null);
+ }
+
+ /* Test null return when for a non-dispatched execution */
+ @Test
+ public void testFetchExecutorByQueuedExecution()
+ throws ExecutorManagerException, IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ ExecutableFlow flow = createExecutableFlow("exec1");
+ loader.uploadExecutableFlow(flow);
+ Assert.assertEquals(loader.fetchExecutorByExecution(flow.getExecutionId()),
+ null);
+ }
+
+ /* Test happy case when assigning and fetching an executor to a flow execution */
+ @Test
+ public void testAssignAndFetchExecutor() throws ExecutorManagerException,
+ IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ String host = "localhost";
+ int port = 12345;
+ Executor executor = loader.addExecutor(host, port);
+ ExecutableFlow flow = createExecutableFlow("exec1");
+ loader.uploadExecutableFlow(flow);
+ loader.assignExecutor(executor.getId(), flow.getExecutionId());
+ Assert.assertEquals(loader.fetchExecutorByExecution(flow.getExecutionId()),
+ executor);
+ }
+
+ /* Test fetchQueuedFlows when there are no queued flows */
+ @Test
+ public void testFetchNoQueuedFlows() throws ExecutorManagerException,
+ IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+
+ ExecutorLoader loader = createLoader();
+ List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+ loader.fetchQueuedFlows();
+
+ // no execution flows at all i.e. no running, completed or queued flows
+ Assert.assertTrue(queuedFlows.isEmpty());
+
+ String host = "lcoalhost";
+ int port = 12345;
+ Executor executor = loader.addExecutor(host, port);
+
+ ExecutableFlow flow = createExecutableFlow("exec1");
+ loader.uploadExecutableFlow(flow);
+ loader.assignExecutor(executor.getId(), flow.getExecutionId());
+ // only completed flows
+ Assert.assertTrue(queuedFlows.isEmpty());
+
+ ExecutableFlow flow2 = createExecutableFlow("exec2");
+ loader.uploadExecutableFlow(flow);
+ loader.assignExecutor(executor.getId(), flow.getExecutionId());
+ ExecutionReference ref = new ExecutionReference(flow2.getExecutionId());
+ loader.addActiveExecutableReference(ref);
+ // only running and completed flows
+ Assert.assertTrue(queuedFlows.isEmpty());
+ }
+
+ /* Test fetchQueuedFlows happy case */
+ @Test
+ public void testFetchQueuedFlows() throws ExecutorManagerException,
+ IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+
+ ExecutorLoader loader = createLoader();
+ List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+ new LinkedList<Pair<ExecutionReference, ExecutableFlow>>();
+
+ ExecutableFlow flow = createExecutableFlow("exec1");
+ loader.uploadExecutableFlow(flow);
+ ExecutableFlow flow2 = createExecutableFlow("exec2");
+ loader.uploadExecutableFlow(flow);
+
+ ExecutionReference ref2 = new ExecutionReference(flow2.getExecutionId());
+ loader.addActiveExecutableReference(ref2);
+ ExecutionReference ref = new ExecutionReference(flow.getExecutionId());
+ loader.addActiveExecutableReference(ref);
+
+ queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref, flow));
+ queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref2, flow2));
+
+ // only running and completed flows
+ Assert.assertArrayEquals(loader.fetchQueuedFlows().toArray(),
+ queuedFlows.toArray());
+ }
/* Test all executors fetch from empty executors */
@Test
@@ -435,7 +580,7 @@ public class JdbcExecutorLoaderTest {
ExecutorLoader loader = createLoader();
try {
String host = "localhost";
- int port = 123456;
+ int port = 12345;
loader.addExecutor(host, port);
loader.addExecutor(host, port);
Assert.fail("Expecting exception, but didn't get one");
@@ -599,7 +744,7 @@ public class JdbcExecutorLoaderTest {
ExecutorLoader loader = createLoader();
ExecutableFlow flow1 = createExecutableFlow("exec1");
loader.uploadExecutableFlow(flow1);
- Executor executor = new Executor(2, "test", 1);
+ Executor executor = new Executor(2, "test", 1, true);
ExecutionReference ref1 =
new ExecutionReference(flow1.getExecutionId(), executor);
loader.addActiveExecutableReference(ref1);
@@ -654,7 +799,7 @@ public class JdbcExecutorLoaderTest {
@Ignore @Test
public void testSmallUploadLog() throws ExecutorManagerException {
- File logDir = new File("unit/executions/logtest");
+ File logDir = new File(UNIT_BASE_DIR + "logtest");
File[] smalllog =
{ new File(logDir, "log1.log"), new File(logDir, "log2.log"),
new File(logDir, "log3.log") };
@@ -679,7 +824,7 @@ public class JdbcExecutorLoaderTest {
@Ignore @Test
public void testLargeUploadLog() throws ExecutorManagerException {
- File logDir = new File("unit/executions/logtest");
+ File logDir = new File(UNIT_BASE_DIR + "logtest");
// Multiple of 255 for Henry the Eigth
File[] largelog =
@@ -725,7 +870,7 @@ public class JdbcExecutorLoaderTest {
ExecutorLoader loader = createLoader();
- File logDir = new File("unit/executions/logtest");
+ File logDir = new File(UNIT_BASE_DIR + "logtest");
// Multiple of 255 for Henry the Eigth
File[] largelog =
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index fbfe625..6969a20 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -345,9 +345,9 @@ public class MockExecutorLoader implements ExecutorLoader {
}
@Override
- public int fetchExecutorId(int execId) throws ExecutorManagerException {
+ public Executor fetchExecutorByExecution(int execId) throws ExecutorManagerException {
if (executionExecutorMapping.containsKey(execId)) {
- return executionExecutorMapping.get(execId);
+ return fetchExecutor(executionExecutorMapping.get(execId));
} else {
throw new ExecutorManagerException(
"Failed to find executor with execution : " + execId);