diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index e622a15..cca687b 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -74,9 +74,9 @@ public class ExecutorManager extends EventHandler implements
"azkaban.use.multiple.executors";
private static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
"azkaban.webserver.queue.size";
- private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS =
+ private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS =
"azkaban.activeexecutor.refresh.milisecinterval";
- private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW =
+ private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW =
"azkaban.activeexecutor.refresh.flowinterval";
private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS =
"azkaban.executorinfo.refresh.maxThreads";
@@ -115,20 +115,19 @@ public class ExecutorManager extends EventHandler implements
private long lastSuccessfulExecutorInfoRefresh;
private ExecutorService executorInforRefresherService;
- public ExecutorManager(Props props, ExecutorLoader loader,
- Map<String, Alerter> alters) throws ExecutorManagerException {
- alerters = alters;
- azkProps = props;
+ public ExecutorManager(Props azkProps, ExecutorLoader loader,
+ Map<String, Alerter> alerters) throws ExecutorManagerException {
+ this.alerters = alerters;
+ this.azkProps = azkProps;
this.executorLoader = loader;
-
this.setupExecutors();
this.loadRunningFlows();
queuedFlows =
- new QueuedExecutions(props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
+ new QueuedExecutions(azkProps.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
this.loadQueuedFlows();
- cacheDir = new File(props.getString("cache.directory", "cache"));
+ cacheDir = new File(azkProps.getString("cache.directory", "cache"));
executingManager = new ExecutingManagerUpdaterThread();
executingManager.start();
@@ -138,7 +137,7 @@ public class ExecutorManager extends EventHandler implements
}
long executionLogsRetentionMs =
- props.getLong("execution.logs.retention.ms",
+ azkProps.getLong("execution.logs.retention.ms",
DEFAULT_EXECUTION_LOGS_RETENTION_MS);
cleanerThread = new CleanerThread(executionLogsRetentionMs);
@@ -172,8 +171,8 @@ public class ExecutorManager extends EventHandler implements
queueProcessor =
new QueueProcessorThread(azkProps.getBoolean(
AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
- AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS, 1000), azkProps.getInt(
- AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW, 10));
+ AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 1000), azkProps.getInt(
+ AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 10));
queueProcessor.start();
}
@@ -1734,10 +1733,18 @@ public class ExecutorManager extends EventHandler implements
private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
Executor choosenExecutor) throws ExecutorManagerException {
exflow.setUpdateTime(System.currentTimeMillis());
- callExecutorServer(exflow, choosenExecutor,
- ConnectorParams.EXECUTE_ACTION);
+
executorLoader.assignExecutor(choosenExecutor.getId(),
exflow.getExecutionId());
+ try {
+ callExecutorServer(exflow, choosenExecutor,
+ ConnectorParams.EXECUTE_ACTION);
+ } catch (ExecutorManagerException ex) {
+ logger.error("Rolling back executor assignment for execution id:"
+ + exflow.getExecutionId(), ex);
+ executorLoader.unassignExecutor(exflow.getExecutionId());
+ throw new ExecutorManagerException(ex);
+ }
reference.setExecutor(choosenExecutor);
// move from flow to running flows
@@ -1855,14 +1862,14 @@ public class ExecutorManager extends EventHandler implements
} else {
exflow.setUpdateTime(currentTime);
// process flow with current snapshot of activeExecutors
- processFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
+ selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
}
currentContinuousFlowProcessed++;
}
}
/* process flow with a snapshot of available Executors */
- private void processFlow(ExecutionReference reference,
+ private void selectExecutorAndDispatchFlow(ExecutionReference reference,
ExecutableFlow exflow, Set<Executor> availableExecutors)
throws ExecutorManagerException {
synchronized (exflow) {
@@ -1953,7 +1960,7 @@ public class ExecutorManager extends EventHandler implements
} else {
remainingExecutors.remove(lastSelectedExecutor);
// try other executors except chosenExecutor
- processFlow(reference, exflow, remainingExecutors);
+ selectExecutorAndDispatchFlow(reference, exflow, remainingExecutors);
}
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 7740163..d6ffb4b 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -1559,4 +1559,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
return events;
}
}
+
+ /**
+ *
+ * {@inheritDoc}
+ * @see azkaban.executor.ExecutorLoader#unassignExecutor(int)
+ */
+ @Override
+ public void unassignExecutor(int executionId) throws ExecutorManagerException {
+ final String UPDATE =
+ "UPDATE execution_flows SET executor_id=NULL where exec_id=?";
+
+ QueryRunner runner = createQueryRunner();
+ try {
+ int rows = runner.update(UPDATE, executionId);
+ if (rows == 0) {
+ throw new ExecutorManagerException(String.format(
+ "Failed to unassign executor for execution : %d ", executionId));
+ }
+ } catch (SQLException e) {
+ throw new ExecutorManagerException("Error updating execution id "
+ + executionId, e);
+ }
+ }
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index a7e2b5f..dd32302 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -335,6 +335,43 @@ public class JdbcExecutorLoaderTest {
}
+ /* Test exception when unassigning an missing execution */
+ @Test
+ public void testUnassignExecutorException() throws ExecutorManagerException,
+ IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ try {
+ loader.unassignExecutor(2);
+ Assert.fail("Expecting exception, but didn't get one");
+ } catch (ExecutorManagerException ex) {
+ System.out.println("Test true");
+ }
+ }
+
+ /* Test happy case when unassigning executor for a flow execution */
+ @Test
+ public void testUnassignExecutor() throws ExecutorManagerException,
+ IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ String host = "localhost";
+ int port = 12345;
+ Executor executor = loader.addExecutor(host, port);
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ loader.uploadExecutableFlow(flow);
+ loader.assignExecutor(executor.getId(), flow.getExecutionId());
+ Assert.assertEquals(
+ loader.fetchExecutorByExecutionId(flow.getExecutionId()), executor);
+ loader.unassignExecutor(flow.getExecutionId());
+ Assert.assertEquals(
+ loader.fetchExecutorByExecutionId(flow.getExecutionId()), null);
+ }
+
/* Test exception when assigning a non-existent executor to a flow */
@Test
public void testAssignExecutorInvalidExecutor()