Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
index bc9e9df..b8692e7 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
@@ -32,6 +32,7 @@ import java.util.List;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
@Singleton
@@ -47,9 +48,14 @@ public class ExecutionFlowDao {
public synchronized void uploadExecutableFlow(final ExecutableFlow flow)
throws ExecutorManagerException {
+
+ final String useExecutorParam =
+ flow.getExecutionOptions().getFlowParameters().get(ExecutionOptions.USE_EXECUTOR);
+ final String executorId = StringUtils.isNotEmpty(useExecutorParam) ? useExecutorParam : null;
+
final String INSERT_EXECUTABLE_FLOW = "INSERT INTO execution_flows "
- + "(project_id, flow_id, version, status, submit_time, submit_user, update_time) "
- + "values (?,?,?,?,?,?,?)";
+ + "(project_id, flow_id, version, status, submit_time, submit_user, update_time, "
+ + "use_executor) values (?,?,?,?,?,?,?,?)";
final long submitTime = System.currentTimeMillis();
flow.setStatus(Status.PREPARING);
flow.setSubmitTime(submitTime);
@@ -63,7 +69,7 @@ public class ExecutionFlowDao {
final SQLTransaction<Long> insertAndGetLastID = transOperator -> {
transOperator.update(INSERT_EXECUTABLE_FLOW, flow.getProjectId(),
flow.getFlowId(), flow.getVersion(), Status.PREPARING.getNumVal(),
- submitTime, flow.getSubmitUser(), submitTime);
+ submitTime, flow.getSubmitUser(), submitTime, executorId);
transOperator.getConnection().commit();
return transOperator.getLastInsertId();
};
@@ -280,12 +286,16 @@ public class ExecutionFlowDao {
}
}
- public int selectAndUpdateExecution(final int executorId) throws ExecutorManagerException {
+ public int selectAndUpdateExecution(final int executorId, final boolean isActive)
+ throws ExecutorManagerException {
final String UPDATE_EXECUTION = "UPDATE execution_flows SET executor_id = ? where exec_id = ?";
+ final String selectExecutionForUpdate = isActive ?
+ SelectFromExecutionFlows.SELECT_EXECUTION_FOR_UPDATE_ACTIVE :
+ SelectFromExecutionFlows.SELECT_EXECUTION_FOR_UPDATE_INACTIVE;
final SQLTransaction<Integer> selectAndUpdateExecution = transOperator -> {
- final List<Integer> execIds = transOperator.query(SelectFromExecutionFlows
- .SELECT_EXECUTION_FOR_UPDATE, new SelectFromExecutionFlows());
+ final List<Integer> execIds = transOperator.query(selectExecutionForUpdate,
+ new SelectFromExecutionFlows(), executorId);
int execId = -1;
if (!execIds.isEmpty()) {
@@ -307,10 +317,17 @@ public class ExecutionFlowDao {
public static class SelectFromExecutionFlows implements
ResultSetHandler<List<Integer>> {
- static String SELECT_EXECUTION_FOR_UPDATE = "SELECT exec_id from execution_flows where "
- + "status = " + Status.PREPARING.getNumVal()
- + " and executor_id is NULL and flow_data is NOT NULL ORDER BY submit_time ASC LIMIT 1 FOR "
- + "UPDATE";
+ private static final String SELECT_EXECUTION_FOR_UPDATE_FORMAT =
+ "SELECT exec_id from execution_flows WHERE status = " + Status.PREPARING.getNumVal()
+ + " and executor_id is NULL and flow_data is NOT NULL and %s"
+ + " ORDER BY submit_time ASC LIMIT 1 FOR UPDATE";
+
+ public static final String SELECT_EXECUTION_FOR_UPDATE_ACTIVE =
+ String.format(SELECT_EXECUTION_FOR_UPDATE_FORMAT,
+ "(use_executor is NULL or use_executor = ?)");
+
+ public static final String SELECT_EXECUTION_FOR_UPDATE_INACTIVE =
+ String.format(SELECT_EXECUTION_FOR_UPDATE_FORMAT, "use_executor = ?");
@Override
public List<Integer> handle(final ResultSet rs) throws SQLException {
@@ -406,7 +423,7 @@ public class ExecutionFlowDao {
final byte[] data = rs.getBytes(3);
if (data == null) {
- logger.error("Found a flow with empty data blob exec_id: " + id);
+ ExecutionFlowDao.logger.error("Found a flow with empty data blob exec_id: " + id);
} else {
final EncodingType encType = EncodingType.fromInteger(encodingType);
try {
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 5c0e990..9887911 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -281,5 +281,6 @@ public interface ExecutorLoader {
int removeExecutionLogsByTime(long millis)
throws ExecutorManagerException;
- int selectAndUpdateExecution(final int executorId) throws ExecutorManagerException;
+ int selectAndUpdateExecution(final int executorId, boolean isActive)
+ throws ExecutorManagerException;
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index da40fb7..6adf826 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -347,7 +347,8 @@ public class JdbcExecutorLoader implements ExecutorLoader {
}
@Override
- public int selectAndUpdateExecution(final int executorId) throws ExecutorManagerException {
- return this.executionFlowDao.selectAndUpdateExecution(executorId);
+ public int selectAndUpdateExecution(final int executorId, final boolean isActive)
+ throws ExecutorManagerException {
+ return this.executionFlowDao.selectAndUpdateExecution(executorId, isActive);
}
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index cff5bfd..b3ed885 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -505,8 +505,8 @@ public class ExecutionFlowDaoTest {
flow.setExecutionId(1);
this.executionFlowDao.uploadExecutableFlow(flow);
final Executor executor = this.executorDao.addExecutor("localhost", 12345);
- assertThat(this.executionFlowDao.selectAndUpdateExecution(executor.getId())).isEqualTo(flow
- .getExecutionId());
+ assertThat(this.executionFlowDao.selectAndUpdateExecution(executor.getId(), true))
+ .isEqualTo(flow.getExecutionId());
assertThat(this.executorDao.fetchExecutorByExecutionId(flow.getExecutionId())).isEqualTo
(executor);
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 6ee4050..d1675e8 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -96,7 +96,7 @@ public class MockExecutorLoader implements ExecutorLoader {
}
private ExecutableFlow getExecutableFlowMetadata(
- ExecutableFlow fullExFlow) {
+ final ExecutableFlow fullExFlow) {
final Flow flow = new Flow(fullExFlow.getId());
final Project project = new Project(fullExFlow.getProjectId(), null);
project.setVersion(fullExFlow.getVersion());
@@ -452,7 +452,8 @@ public class MockExecutorLoader implements ExecutorLoader {
}
@Override
- public int selectAndUpdateExecution(final int executorId) throws ExecutorManagerException {
+ public int selectAndUpdateExecution(final int executorId, final boolean isActive)
+ throws ExecutorManagerException {
return 1;
}
}
diff --git a/azkaban-db/src/main/sql/create.execution_flows.sql b/azkaban-db/src/main/sql/create.execution_flows.sql
index 33ebfc4..b480f46 100644
--- a/azkaban-db/src/main/sql/create.execution_flows.sql
+++ b/azkaban-db/src/main/sql/create.execution_flows.sql
@@ -12,6 +12,7 @@ CREATE TABLE execution_flows (
enc_type TINYINT,
flow_data LONGBLOB,
executor_id INT DEFAULT NULL,
+ use_executor INT DEFAULT NULL,
PRIMARY KEY (exec_id)
);
diff --git a/azkaban-db/src/main/sql/upgrade.3.68.0.to.3.69.0.sql b/azkaban-db/src/main/sql/upgrade.3.68.0.to.3.69.0.sql
new file mode 100644
index 0000000..c3c5fd5
--- /dev/null
+++ b/azkaban-db/src/main/sql/upgrade.3.68.0.to.3.69.0.sql
@@ -0,0 +1,6 @@
+-- DB Migration from release 3.68.0 to 3.69.0
+-- PR #2129 Implement "useExecutor" feature for new dispatching Logic (Poll model).
+-- use_executor column contains the id of the executor that should handle the execution.
+-- This id is a parameter an Azkaban admin can specify when launching a new execution.
+--
+ALTER TABLE execution_flows ADD COLUMN use_executor INT;
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index f418561..b9c6bf1 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -981,11 +981,11 @@ public class FlowRunnerManager implements EventListener,
FlowRunnerManager.logger.error("Failed to fetch executor ", e);
}
}
- } else if (FlowRunnerManager.this.active) {
+ } else {
try {
// Todo jamiesjc: check executor capacity before polling from DB
final int execId = FlowRunnerManager.this.executorLoader
- .selectAndUpdateExecution(this.executorId);
+ .selectAndUpdateExecution(this.executorId, FlowRunnerManager.this.active);
if (execId != -1) {
FlowRunnerManager.logger.info("Submitting flow " + execId);
submitFlow(execId);