azkaban-aplcache

Implement "flowPriority" (at the time of polling) feature

3/20/2019 7:01:24 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
index d6f96d7..6fbe375 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
@@ -1,18 +1,18 @@
 /*
-* Copyright 2018 LinkedIn Corp.
-*
-* Licensed under the Apache License, Version 2.0 (the “License”); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the “License”); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
 package azkaban.executor;
 
 import azkaban.Constants.ConfigurationKeys;
@@ -589,6 +589,7 @@ public class ExecutionController extends EventHandler implements ExecutorManager
 
       final int projectId = exflow.getProjectId();
       exflow.setSubmitUser(userId);
+      exflow.setStatus(Status.PREPARING);
       exflow.setSubmitTime(System.currentTimeMillis());
 
       final List<Integer> running = getRunningFlows(projectId, flowId);
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
index 21159fe..f7a6325 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
@@ -53,12 +53,15 @@ public class ExecutionFlowDao {
         flow.getExecutionOptions().getFlowParameters().get(ExecutionOptions.USE_EXECUTOR);
     final String executorId = StringUtils.isNotEmpty(useExecutorParam) ? useExecutorParam : null;
 
+    final String flowPriorityParam =
+        flow.getExecutionOptions().getFlowParameters().get(ExecutionOptions.FLOW_PRIORITY);
+    final int flowPriority = StringUtils.isNotEmpty(flowPriorityParam) ?
+        Integer.parseInt(flowPriorityParam) : ExecutionOptions.DEFAULT_FLOW_PRIORITY;
+
     final String INSERT_EXECUTABLE_FLOW = "INSERT INTO execution_flows "
         + "(project_id, flow_id, version, status, submit_time, submit_user, update_time, "
-        + "use_executor) values (?,?,?,?,?,?,?,?)";
-    final long submitTime = System.currentTimeMillis();
-    flow.setStatus(Status.PREPARING);
-    flow.setSubmitTime(submitTime);
+        + "use_executor, flow_priority) values (?,?,?,?,?,?,?,?,?)";
+    final long submitTime = flow.getSubmitTime();
 
     /**
      * Why we need a transaction to get last insert ID?
@@ -68,8 +71,8 @@ public class ExecutionFlowDao {
      */
     final SQLTransaction<Long> insertAndGetLastID = transOperator -> {
       transOperator.update(INSERT_EXECUTABLE_FLOW, flow.getProjectId(),
-          flow.getFlowId(), flow.getVersion(), Status.PREPARING.getNumVal(),
-          submitTime, flow.getSubmitUser(), submitTime, executorId);
+          flow.getFlowId(), flow.getVersion(), flow.getStatus().getNumVal(),
+          submitTime, flow.getSubmitUser(), submitTime, executorId, flowPriority);
       transOperator.getConnection().commit();
       return transOperator.getLastInsertId();
     };
@@ -337,7 +340,7 @@ public class ExecutionFlowDao {
     private static final String SELECT_EXECUTION_FOR_UPDATE_FORMAT =
         "SELECT exec_id from execution_flows WHERE status = " + Status.PREPARING.getNumVal()
             + " and executor_id is NULL and flow_data is NOT NULL and %s"
-            + " ORDER BY submit_time ASC LIMIT 1 FOR UPDATE";
+            + " ORDER BY flow_priority DESC, submit_time ASC, exec_id ASC LIMIT 1 FOR UPDATE";
 
     public static final String SELECT_EXECUTION_FOR_UPDATE_ACTIVE =
         String.format(SELECT_EXECUTION_FOR_UPDATE_FORMAT,
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 4f39cd9..92120b3 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -953,6 +953,7 @@ public class ExecutorManager extends EventHandler implements
       } else {
         final int projectId = exflow.getProjectId();
         exflow.setSubmitUser(userId);
+        exflow.setStatus(Status.PREPARING);
         exflow.setSubmitTime(System.currentTimeMillis());
 
         // Get collection of running flows given a project and a specific flow name
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index b3ed885..f62543c 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -18,7 +18,6 @@ package azkaban.executor;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertEquals;
 
 import azkaban.db.DatabaseOperator;
 import azkaban.project.JdbcProjectImpl;
@@ -112,7 +111,12 @@ public class ExecutionFlowDaoTest {
   public void testUploadAndFetchExecutionFlows() throws Exception {
 
     final ExecutableFlow flow = createTestFlow();
+    flow.setSubmitUser("testUser1");
+    flow.setStatus(Status.PREPARING);
+    flow.setSubmitTime(System.currentTimeMillis());
+    flow.setExecutionId(0);
     this.executionFlowDao.uploadExecutableFlow(flow);
+    assertThat(flow.getExecutionId()).isNotEqualTo(0);
 
     final ExecutableFlow fetchFlow =
         this.executionFlowDao.fetchExecutableFlow(flow.getExecutionId());
@@ -423,8 +427,8 @@ public class ExecutionFlowDaoTest {
   private ExecutableFlow createExecution(final Status status)
       throws IOException, ExecutorManagerException {
     final ExecutableFlow flow = TestUtils.createTestExecutableFlow("exectest1", "exec1");
+    flow.setSubmitTime(System.currentTimeMillis());
     this.executionFlowDao.uploadExecutableFlow(flow);
-    assertEquals(Status.PREPARING, flow.getStatus());
     flow.setStatus(status);
     this.executionFlowDao.updateExecutableFlow(flow);
     return flow;
@@ -502,7 +506,8 @@ public class ExecutionFlowDaoTest {
   @Test
   public void testSelectAndUpdateExecution() throws Exception {
     final ExecutableFlow flow = TestUtils.createTestExecutableFlow("exectest1", "exec1");
-    flow.setExecutionId(1);
+    flow.setStatus(Status.PREPARING);
+    flow.setSubmitTime(System.currentTimeMillis());
     this.executionFlowDao.uploadExecutableFlow(flow);
     final Executor executor = this.executorDao.addExecutor("localhost", 12345);
     assertThat(this.executionFlowDao.selectAndUpdateExecution(executor.getId(), true))
@@ -511,6 +516,87 @@ public class ExecutionFlowDaoTest {
         (executor);
   }
 
+  @Test
+  public void testSelectAndUpdateExecutionWithPriority() throws Exception {
+    // Selecting executions when DB is empty
+    assertThat(this.executionFlowDao.selectAndUpdateExecution(-1, true))
+        .as("Expected no execution selected")
+        .isEqualTo(-1);
+
+    final long currentTime = System.currentTimeMillis();
+    final ExecutableFlow lowPriorityFlow1 = submitNewFlow("exectest1", "exec1", currentTime,
+        ExecutionOptions.DEFAULT_FLOW_PRIORITY);
+
+    final ExecutableFlow highPriorityFlow = submitNewFlow("exectest1", "exec1", currentTime + 5,
+        ExecutionOptions.DEFAULT_FLOW_PRIORITY + 5);
+
+    final ExecutableFlow lowPriorityFlow2 = submitNewFlow("exectest1", "exec1", currentTime + 10,
+        ExecutionOptions.DEFAULT_FLOW_PRIORITY + 3);
+
+    assertThat(this.executionFlowDao.selectAndUpdateExecution(-1, true))
+        .as("Expected flow with highest priority")
+        .isEqualTo(highPriorityFlow.getExecutionId());
+
+    assertThat(this.executionFlowDao.selectAndUpdateExecution(-1, true))
+        .as("Expected second flow with highest priority")
+        .isEqualTo(lowPriorityFlow2.getExecutionId());
+
+    assertThat(this.executionFlowDao.selectAndUpdateExecution(-1, true))
+        .as("Expected flow with lowest priority")
+        .isEqualTo(lowPriorityFlow1.getExecutionId());
+
+    // Selecting executions when there are no more submitted flows left
+    assertThat(this.executionFlowDao.selectAndUpdateExecution(-1, true))
+        .as("Expected no execution selected")
+        .isEqualTo(-1);
+  }
+
+  @Test
+  public void testSelectAndUpdateExecutionWithSamePriority() throws Exception {
+    // Selecting executions when DB is empty
+    assertThat(this.executionFlowDao.selectAndUpdateExecution(-1, true))
+        .as("Expected no execution selected")
+        .isEqualTo(-1);
+
+    final long currentTime = System.currentTimeMillis();
+    final ExecutableFlow submittedFlow1 = submitNewFlow("exectest1", "exec1", currentTime,
+        ExecutionOptions.DEFAULT_FLOW_PRIORITY + 3);
+
+    final ExecutableFlow submittedFlow2 = submitNewFlow("exectest1", "exec1", currentTime + 5,
+        ExecutionOptions.DEFAULT_FLOW_PRIORITY + 3);
+
+    final ExecutableFlow submittedFlow3 = submitNewFlow("exectest1", "exec1", currentTime + 10,
+        ExecutionOptions.DEFAULT_FLOW_PRIORITY + 3);
+
+    assertThat(this.executionFlowDao.selectAndUpdateExecution(-1, true))
+        .as("Expected first flow submitted")
+        .isEqualTo(submittedFlow1.getExecutionId());
+
+    assertThat(this.executionFlowDao.selectAndUpdateExecution(-1, true))
+        .as("Expected second flow submitted")
+        .isEqualTo(submittedFlow2.getExecutionId());
+
+    assertThat(this.executionFlowDao.selectAndUpdateExecution(-1, true))
+        .as("Expected last flow submitted")
+        .isEqualTo(submittedFlow3.getExecutionId());
+
+    // Selecting executions when there are no more submitted flows left
+    assertThat(this.executionFlowDao.selectAndUpdateExecution(-1, true))
+        .as("Expected no execution selected")
+        .isEqualTo(-1);
+  }
+
+  private ExecutableFlow submitNewFlow(final String projectName, final String flowName,
+      final long submitTime, final int flowPriority) throws IOException, ExecutorManagerException {
+    final ExecutableFlow flow = TestUtils.createTestExecutableFlow(projectName, flowName);
+    flow.setStatus(Status.PREPARING);
+    flow.setSubmitTime(submitTime);
+    flow.getExecutionOptions().getFlowParameters().put(ExecutionOptions.FLOW_PRIORITY,
+        String.valueOf(flowPriority));
+    this.executionFlowDao.uploadExecutableFlow(flow);
+    return flow;
+  }
+
   private void assertTwoFlowSame(final ExecutableFlow flow1, final ExecutableFlow flow2) {
     assertTwoFlowSame(flow1, flow2, true);
   }
diff --git a/azkaban-db/src/main/sql/create.execution_flows.sql b/azkaban-db/src/main/sql/create.execution_flows.sql
index b480f46..f7fde4c 100644
--- a/azkaban-db/src/main/sql/create.execution_flows.sql
+++ b/azkaban-db/src/main/sql/create.execution_flows.sql
@@ -13,6 +13,7 @@ CREATE TABLE execution_flows (
   flow_data   LONGBLOB,
   executor_id INT                   DEFAULT NULL,
   use_executor INT                  DEFAULT NULL,
+  flow_priority TINYINT    NOT NULL DEFAULT 5,
   PRIMARY KEY (exec_id)
 );
 
diff --git a/azkaban-db/src/main/sql/upgrade.3.68.0.to.3.69.0.sql b/azkaban-db/src/main/sql/upgrade.3.68.0.to.3.69.0.sql
index c3c5fd5..e1f8447 100644
--- a/azkaban-db/src/main/sql/upgrade.3.68.0.to.3.69.0.sql
+++ b/azkaban-db/src/main/sql/upgrade.3.68.0.to.3.69.0.sql
@@ -3,4 +3,4 @@
 -- use_executor column contains the id of the executor that should handle the execution.
 -- This id is a parameter an Azkaban admin can specify when launching a new execution.
 --
-ALTER TABLE execution_flows ADD COLUMN use_executor INT;
+ALTER TABLE execution_flows ADD COLUMN use_executor INT DEFAULT NULL;
diff --git a/azkaban-db/src/main/sql/upgrade.3.69.0.to.3.70.0.sql b/azkaban-db/src/main/sql/upgrade.3.69.0.to.3.70.0.sql
new file mode 100644
index 0000000..02fc7ba
--- /dev/null
+++ b/azkaban-db/src/main/sql/upgrade.3.69.0.to.3.70.0.sql
@@ -0,0 +1,8 @@
+-- DB Migration from release 3.69.0 to 3.70.0
+-- PR #2140 Implements “flowPriority” (at the time of polling) feature for new dispatching logic.
+-- flow_priority column can hold a positive int, negative int or 5 which is the default flow
+-- priority set in ExecutionOptions.DEFAULT_FLOW_PRIORITY for all flows.
+-- "flowPriority" is an execution option an Azkaban admin can specify when launching a new execution.
+-- It will allow a flow to be dispatched or polled first.
+--
+ALTER TABLE execution_flows ADD COLUMN flow_priority TINYINT NOT NULL DEFAULT 5;