Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
index d6f96d7..6fbe375 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
@@ -1,18 +1,18 @@
/*
-* Copyright 2018 LinkedIn Corp.
-*
-* Licensed under the Apache License, Version 2.0 (the “License”); you may not
-* use this file except in compliance with the License. You may obtain a copy of
-* the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
-* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-* License for the specific language governing permissions and limitations under
-* the License.
-*/
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the “License”); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
package azkaban.executor;
import azkaban.Constants.ConfigurationKeys;
@@ -589,6 +589,7 @@ public class ExecutionController extends EventHandler implements ExecutorManager
final int projectId = exflow.getProjectId();
exflow.setSubmitUser(userId);
+ exflow.setStatus(Status.PREPARING);
exflow.setSubmitTime(System.currentTimeMillis());
final List<Integer> running = getRunningFlows(projectId, flowId);
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
index 21159fe..f7a6325 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
@@ -53,12 +53,15 @@ public class ExecutionFlowDao {
flow.getExecutionOptions().getFlowParameters().get(ExecutionOptions.USE_EXECUTOR);
final String executorId = StringUtils.isNotEmpty(useExecutorParam) ? useExecutorParam : null;
+ final String flowPriorityParam =
+ flow.getExecutionOptions().getFlowParameters().get(ExecutionOptions.FLOW_PRIORITY);
+ final int flowPriority = StringUtils.isNotEmpty(flowPriorityParam) ?
+ Integer.parseInt(flowPriorityParam) : ExecutionOptions.DEFAULT_FLOW_PRIORITY;
+
final String INSERT_EXECUTABLE_FLOW = "INSERT INTO execution_flows "
+ "(project_id, flow_id, version, status, submit_time, submit_user, update_time, "
- + "use_executor) values (?,?,?,?,?,?,?,?)";
- final long submitTime = System.currentTimeMillis();
- flow.setStatus(Status.PREPARING);
- flow.setSubmitTime(submitTime);
+ + "use_executor, flow_priority) values (?,?,?,?,?,?,?,?,?)";
+ final long submitTime = flow.getSubmitTime();
/**
* Why we need a transaction to get last insert ID?
@@ -68,8 +71,8 @@ public class ExecutionFlowDao {
*/
final SQLTransaction<Long> insertAndGetLastID = transOperator -> {
transOperator.update(INSERT_EXECUTABLE_FLOW, flow.getProjectId(),
- flow.getFlowId(), flow.getVersion(), Status.PREPARING.getNumVal(),
- submitTime, flow.getSubmitUser(), submitTime, executorId);
+ flow.getFlowId(), flow.getVersion(), flow.getStatus().getNumVal(),
+ submitTime, flow.getSubmitUser(), submitTime, executorId, flowPriority);
transOperator.getConnection().commit();
return transOperator.getLastInsertId();
};
@@ -337,7 +340,7 @@ public class ExecutionFlowDao {
private static final String SELECT_EXECUTION_FOR_UPDATE_FORMAT =
"SELECT exec_id from execution_flows WHERE status = " + Status.PREPARING.getNumVal()
+ " and executor_id is NULL and flow_data is NOT NULL and %s"
- + " ORDER BY submit_time ASC LIMIT 1 FOR UPDATE";
+ + " ORDER BY flow_priority DESC, submit_time ASC, exec_id ASC LIMIT 1 FOR UPDATE";
public static final String SELECT_EXECUTION_FOR_UPDATE_ACTIVE =
String.format(SELECT_EXECUTION_FOR_UPDATE_FORMAT,
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 4f39cd9..92120b3 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -953,6 +953,7 @@ public class ExecutorManager extends EventHandler implements
} else {
final int projectId = exflow.getProjectId();
exflow.setSubmitUser(userId);
+ exflow.setStatus(Status.PREPARING);
exflow.setSubmitTime(System.currentTimeMillis());
// Get collection of running flows given a project and a specific flow name
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index b3ed885..f62543c 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -18,7 +18,6 @@ package azkaban.executor;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertEquals;
import azkaban.db.DatabaseOperator;
import azkaban.project.JdbcProjectImpl;
@@ -112,7 +111,12 @@ public class ExecutionFlowDaoTest {
public void testUploadAndFetchExecutionFlows() throws Exception {
final ExecutableFlow flow = createTestFlow();
+ flow.setSubmitUser("testUser1");
+ flow.setStatus(Status.PREPARING);
+ flow.setSubmitTime(System.currentTimeMillis());
+ flow.setExecutionId(0);
this.executionFlowDao.uploadExecutableFlow(flow);
+ assertThat(flow.getExecutionId()).isNotEqualTo(0);
final ExecutableFlow fetchFlow =
this.executionFlowDao.fetchExecutableFlow(flow.getExecutionId());
@@ -423,8 +427,8 @@ public class ExecutionFlowDaoTest {
private ExecutableFlow createExecution(final Status status)
throws IOException, ExecutorManagerException {
final ExecutableFlow flow = TestUtils.createTestExecutableFlow("exectest1", "exec1");
+ flow.setSubmitTime(System.currentTimeMillis());
this.executionFlowDao.uploadExecutableFlow(flow);
- assertEquals(Status.PREPARING, flow.getStatus());
flow.setStatus(status);
this.executionFlowDao.updateExecutableFlow(flow);
return flow;
@@ -502,7 +506,8 @@ public class ExecutionFlowDaoTest {
@Test
public void testSelectAndUpdateExecution() throws Exception {
final ExecutableFlow flow = TestUtils.createTestExecutableFlow("exectest1", "exec1");
- flow.setExecutionId(1);
+ flow.setStatus(Status.PREPARING);
+ flow.setSubmitTime(System.currentTimeMillis());
this.executionFlowDao.uploadExecutableFlow(flow);
final Executor executor = this.executorDao.addExecutor("localhost", 12345);
assertThat(this.executionFlowDao.selectAndUpdateExecution(executor.getId(), true))
@@ -511,6 +516,87 @@ public class ExecutionFlowDaoTest {
(executor);
}
+ @Test
+ public void testSelectAndUpdateExecutionWithPriority() throws Exception {
+ // Selecting executions when DB is empty
+ assertThat(this.executionFlowDao.selectAndUpdateExecution(-1, true))
+ .as("Expected no execution selected")
+ .isEqualTo(-1);
+
+ final long currentTime = System.currentTimeMillis();
+ final ExecutableFlow lowPriorityFlow1 = submitNewFlow("exectest1", "exec1", currentTime,
+ ExecutionOptions.DEFAULT_FLOW_PRIORITY);
+
+ final ExecutableFlow highPriorityFlow = submitNewFlow("exectest1", "exec1", currentTime + 5,
+ ExecutionOptions.DEFAULT_FLOW_PRIORITY + 5);
+
+ final ExecutableFlow lowPriorityFlow2 = submitNewFlow("exectest1", "exec1", currentTime + 10,
+ ExecutionOptions.DEFAULT_FLOW_PRIORITY + 3);
+
+ assertThat(this.executionFlowDao.selectAndUpdateExecution(-1, true))
+ .as("Expected flow with highest priority")
+ .isEqualTo(highPriorityFlow.getExecutionId());
+
+ assertThat(this.executionFlowDao.selectAndUpdateExecution(-1, true))
+ .as("Expected second flow with highest priority")
+ .isEqualTo(lowPriorityFlow2.getExecutionId());
+
+ assertThat(this.executionFlowDao.selectAndUpdateExecution(-1, true))
+ .as("Expected flow with lowest priority")
+ .isEqualTo(lowPriorityFlow1.getExecutionId());
+
+ // Selecting executions when there are no more submitted flows left
+ assertThat(this.executionFlowDao.selectAndUpdateExecution(-1, true))
+ .as("Expected no execution selected")
+ .isEqualTo(-1);
+ }
+
+ @Test
+ public void testSelectAndUpdateExecutionWithSamePriority() throws Exception {
+ // Selecting executions when DB is empty
+ assertThat(this.executionFlowDao.selectAndUpdateExecution(-1, true))
+ .as("Expected no execution selected")
+ .isEqualTo(-1);
+
+ final long currentTime = System.currentTimeMillis();
+ final ExecutableFlow submittedFlow1 = submitNewFlow("exectest1", "exec1", currentTime,
+ ExecutionOptions.DEFAULT_FLOW_PRIORITY + 3);
+
+ final ExecutableFlow submittedFlow2 = submitNewFlow("exectest1", "exec1", currentTime + 5,
+ ExecutionOptions.DEFAULT_FLOW_PRIORITY + 3);
+
+ final ExecutableFlow submittedFlow3 = submitNewFlow("exectest1", "exec1", currentTime + 10,
+ ExecutionOptions.DEFAULT_FLOW_PRIORITY + 3);
+
+ assertThat(this.executionFlowDao.selectAndUpdateExecution(-1, true))
+ .as("Expected first flow submitted")
+ .isEqualTo(submittedFlow1.getExecutionId());
+
+ assertThat(this.executionFlowDao.selectAndUpdateExecution(-1, true))
+ .as("Expected second flow submitted")
+ .isEqualTo(submittedFlow2.getExecutionId());
+
+ assertThat(this.executionFlowDao.selectAndUpdateExecution(-1, true))
+ .as("Expected last flow submitted")
+ .isEqualTo(submittedFlow3.getExecutionId());
+
+ // Selecting executions when there are no more submitted flows left
+ assertThat(this.executionFlowDao.selectAndUpdateExecution(-1, true))
+ .as("Expected no execution selected")
+ .isEqualTo(-1);
+ }
+
+ private ExecutableFlow submitNewFlow(final String projectName, final String flowName,
+ final long submitTime, final int flowPriority) throws IOException, ExecutorManagerException {
+ final ExecutableFlow flow = TestUtils.createTestExecutableFlow(projectName, flowName);
+ flow.setStatus(Status.PREPARING);
+ flow.setSubmitTime(submitTime);
+ flow.getExecutionOptions().getFlowParameters().put(ExecutionOptions.FLOW_PRIORITY,
+ String.valueOf(flowPriority));
+ this.executionFlowDao.uploadExecutableFlow(flow);
+ return flow;
+ }
+
private void assertTwoFlowSame(final ExecutableFlow flow1, final ExecutableFlow flow2) {
assertTwoFlowSame(flow1, flow2, true);
}
diff --git a/azkaban-db/src/main/sql/create.execution_flows.sql b/azkaban-db/src/main/sql/create.execution_flows.sql
index b480f46..f7fde4c 100644
--- a/azkaban-db/src/main/sql/create.execution_flows.sql
+++ b/azkaban-db/src/main/sql/create.execution_flows.sql
@@ -13,6 +13,7 @@ CREATE TABLE execution_flows (
flow_data LONGBLOB,
executor_id INT DEFAULT NULL,
use_executor INT DEFAULT NULL,
+ flow_priority TINYINT NOT NULL DEFAULT 5,
PRIMARY KEY (exec_id)
);
diff --git a/azkaban-db/src/main/sql/upgrade.3.68.0.to.3.69.0.sql b/azkaban-db/src/main/sql/upgrade.3.68.0.to.3.69.0.sql
index c3c5fd5..e1f8447 100644
--- a/azkaban-db/src/main/sql/upgrade.3.68.0.to.3.69.0.sql
+++ b/azkaban-db/src/main/sql/upgrade.3.68.0.to.3.69.0.sql
@@ -3,4 +3,4 @@
-- use_executor column contains the id of the executor that should handle the execution.
-- This id is a parameter an Azkaban admin can specify when launching a new execution.
--
-ALTER TABLE execution_flows ADD COLUMN use_executor INT;
+ALTER TABLE execution_flows ADD COLUMN use_executor INT DEFAULT NULL;
diff --git a/azkaban-db/src/main/sql/upgrade.3.69.0.to.3.70.0.sql b/azkaban-db/src/main/sql/upgrade.3.69.0.to.3.70.0.sql
new file mode 100644
index 0000000..02fc7ba
--- /dev/null
+++ b/azkaban-db/src/main/sql/upgrade.3.69.0.to.3.70.0.sql
@@ -0,0 +1,8 @@
+-- DB Migration from release 3.69.0 to 3.70.0
+-- PR #2140 Implements “flowPriority” (at the time of polling) feature for new dispatching logic.
+-- flow_priority column can hold a positive int, negative int or 5 which is the default flow
+-- priority set in ExecutionOptions.DEFAULT_FLOW_PRIORITY for all flows.
+-- "flowPriority" is an execution option an Azkaban admin can specify when launching a new execution.
+-- It will allow a flow to be dispatched or polled first.
+--
+ALTER TABLE execution_flows ADD COLUMN flow_priority TINYINT NOT NULL DEFAULT 5;