azkaban-aplcache

Implement "useExecutor" feature for new dispatching logic

2/28/2019 10:16:39 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
index bc9e9df..b8692e7 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
@@ -32,6 +32,7 @@ import java.util.List;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 
 @Singleton
@@ -47,9 +48,14 @@ public class ExecutionFlowDao {
 
   public synchronized void uploadExecutableFlow(final ExecutableFlow flow)
       throws ExecutorManagerException {
+
+    final String useExecutorParam =
+        flow.getExecutionOptions().getFlowParameters().get(ExecutionOptions.USE_EXECUTOR);
+    final String executorId = StringUtils.isNotEmpty(useExecutorParam) ? useExecutorParam : null;
+
     final String INSERT_EXECUTABLE_FLOW = "INSERT INTO execution_flows "
-        + "(project_id, flow_id, version, status, submit_time, submit_user, update_time) "
-        + "values (?,?,?,?,?,?,?)";
+        + "(project_id, flow_id, version, status, submit_time, submit_user, update_time, "
+        + "use_executor) values (?,?,?,?,?,?,?,?)";
     final long submitTime = System.currentTimeMillis();
     flow.setStatus(Status.PREPARING);
     flow.setSubmitTime(submitTime);
@@ -63,7 +69,7 @@ public class ExecutionFlowDao {
     final SQLTransaction<Long> insertAndGetLastID = transOperator -> {
       transOperator.update(INSERT_EXECUTABLE_FLOW, flow.getProjectId(),
           flow.getFlowId(), flow.getVersion(), Status.PREPARING.getNumVal(),
-          submitTime, flow.getSubmitUser(), submitTime);
+          submitTime, flow.getSubmitUser(), submitTime, executorId);
       transOperator.getConnection().commit();
       return transOperator.getLastInsertId();
     };
@@ -280,12 +286,16 @@ public class ExecutionFlowDao {
     }
   }
 
-  public int selectAndUpdateExecution(final int executorId) throws ExecutorManagerException {
+  public int selectAndUpdateExecution(final int executorId, final boolean isActive)
+      throws ExecutorManagerException {
     final String UPDATE_EXECUTION = "UPDATE execution_flows SET executor_id = ? where exec_id = ?";
+    final String selectExecutionForUpdate = isActive ?
+        SelectFromExecutionFlows.SELECT_EXECUTION_FOR_UPDATE_ACTIVE :
+        SelectFromExecutionFlows.SELECT_EXECUTION_FOR_UPDATE_INACTIVE;
 
     final SQLTransaction<Integer> selectAndUpdateExecution = transOperator -> {
-      final List<Integer> execIds = transOperator.query(SelectFromExecutionFlows
-          .SELECT_EXECUTION_FOR_UPDATE, new SelectFromExecutionFlows());
+      final List<Integer> execIds = transOperator.query(selectExecutionForUpdate,
+          new SelectFromExecutionFlows(), executorId);
 
       int execId = -1;
       if (!execIds.isEmpty()) {
@@ -307,10 +317,17 @@ public class ExecutionFlowDao {
   public static class SelectFromExecutionFlows implements
       ResultSetHandler<List<Integer>> {
 
-    static String SELECT_EXECUTION_FOR_UPDATE = "SELECT exec_id from execution_flows where "
-        + "status = " + Status.PREPARING.getNumVal()
-        + " and executor_id is NULL and flow_data is NOT NULL ORDER BY submit_time ASC LIMIT 1 FOR "
-        + "UPDATE";
+    private static final String SELECT_EXECUTION_FOR_UPDATE_FORMAT =
+        "SELECT exec_id from execution_flows WHERE status = " + Status.PREPARING.getNumVal()
+            + " and executor_id is NULL and flow_data is NOT NULL and %s"
+            + " ORDER BY submit_time ASC LIMIT 1 FOR UPDATE";
+
+    public static final String SELECT_EXECUTION_FOR_UPDATE_ACTIVE =
+        String.format(SELECT_EXECUTION_FOR_UPDATE_FORMAT,
+            "(use_executor is NULL or use_executor = ?)");
+
+    public static final String SELECT_EXECUTION_FOR_UPDATE_INACTIVE =
+        String.format(SELECT_EXECUTION_FOR_UPDATE_FORMAT, "use_executor = ?");
 
     @Override
     public List<Integer> handle(final ResultSet rs) throws SQLException {
@@ -406,7 +423,7 @@ public class ExecutionFlowDao {
         final byte[] data = rs.getBytes(3);
 
         if (data == null) {
-          logger.error("Found a flow with empty data blob exec_id: " + id);
+          ExecutionFlowDao.logger.error("Found a flow with empty data blob exec_id: " + id);
         } else {
           final EncodingType encType = EncodingType.fromInteger(encodingType);
           try {
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 5c0e990..9887911 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -281,5 +281,6 @@ public interface ExecutorLoader {
   int removeExecutionLogsByTime(long millis)
       throws ExecutorManagerException;
 
-  int selectAndUpdateExecution(final int executorId) throws ExecutorManagerException;
+  int selectAndUpdateExecution(final int executorId, boolean isActive)
+      throws ExecutorManagerException;
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index da40fb7..6adf826 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -347,7 +347,8 @@ public class JdbcExecutorLoader implements ExecutorLoader {
   }
 
   @Override
-  public int selectAndUpdateExecution(final int executorId) throws ExecutorManagerException {
-    return this.executionFlowDao.selectAndUpdateExecution(executorId);
+  public int selectAndUpdateExecution(final int executorId, final boolean isActive)
+      throws ExecutorManagerException {
+    return this.executionFlowDao.selectAndUpdateExecution(executorId, isActive);
   }
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index cff5bfd..b3ed885 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -505,8 +505,8 @@ public class ExecutionFlowDaoTest {
     flow.setExecutionId(1);
     this.executionFlowDao.uploadExecutableFlow(flow);
     final Executor executor = this.executorDao.addExecutor("localhost", 12345);
-    assertThat(this.executionFlowDao.selectAndUpdateExecution(executor.getId())).isEqualTo(flow
-        .getExecutionId());
+    assertThat(this.executionFlowDao.selectAndUpdateExecution(executor.getId(), true))
+        .isEqualTo(flow.getExecutionId());
     assertThat(this.executorDao.fetchExecutorByExecutionId(flow.getExecutionId())).isEqualTo
         (executor);
   }
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 6ee4050..d1675e8 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -96,7 +96,7 @@ public class MockExecutorLoader implements ExecutorLoader {
   }
 
   private ExecutableFlow getExecutableFlowMetadata(
-      ExecutableFlow fullExFlow) {
+      final ExecutableFlow fullExFlow) {
     final Flow flow = new Flow(fullExFlow.getId());
     final Project project = new Project(fullExFlow.getProjectId(), null);
     project.setVersion(fullExFlow.getVersion());
@@ -452,7 +452,8 @@ public class MockExecutorLoader implements ExecutorLoader {
   }
 
   @Override
-  public int selectAndUpdateExecution(final int executorId) throws ExecutorManagerException {
+  public int selectAndUpdateExecution(final int executorId, final boolean isActive)
+      throws ExecutorManagerException {
     return 1;
   }
 }
diff --git a/azkaban-db/src/main/sql/create.execution_flows.sql b/azkaban-db/src/main/sql/create.execution_flows.sql
index 33ebfc4..b480f46 100644
--- a/azkaban-db/src/main/sql/create.execution_flows.sql
+++ b/azkaban-db/src/main/sql/create.execution_flows.sql
@@ -12,6 +12,7 @@ CREATE TABLE execution_flows (
   enc_type    TINYINT,
   flow_data   LONGBLOB,
   executor_id INT                   DEFAULT NULL,
+  use_executor INT                  DEFAULT NULL,
   PRIMARY KEY (exec_id)
 );
 
diff --git a/azkaban-db/src/main/sql/upgrade.3.68.0.to.3.69.0.sql b/azkaban-db/src/main/sql/upgrade.3.68.0.to.3.69.0.sql
new file mode 100644
index 0000000..c3c5fd5
--- /dev/null
+++ b/azkaban-db/src/main/sql/upgrade.3.68.0.to.3.69.0.sql
@@ -0,0 +1,6 @@
+-- DB Migration from release 3.68.0 to 3.69.0
+-- PR #2129 Implement "useExecutor" feature for new dispatching Logic (Poll model).
+-- use_executor column contains the id of the executor that should handle the execution.
+-- This id is a parameter an Azkaban admin can specify when launching a new execution.
+--
+ALTER TABLE execution_flows ADD COLUMN use_executor INT;
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index f418561..b9c6bf1 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -981,11 +981,11 @@ public class FlowRunnerManager implements EventListener,
             FlowRunnerManager.logger.error("Failed to fetch executor ", e);
           }
         }
-      } else if (FlowRunnerManager.this.active) {
+      } else {
         try {
           // Todo jamiesjc: check executor capacity before polling from DB
           final int execId = FlowRunnerManager.this.executorLoader
-              .selectAndUpdateExecution(this.executorId);
+              .selectAndUpdateExecution(this.executorId, FlowRunnerManager.this.active);
           if (execId != -1) {
             FlowRunnerManager.logger.info("Submitting flow " + execId);
             submitFlow(execId);