Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
index c0bf5c3..2a26c1a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
@@ -20,6 +20,7 @@ public class ExecutionReference {
private final int execId;
private Executor executor;
+ //Todo jamiesjc: deprecate updateTime in ExecutionReference class gradually.
private long updateTime;
private long nextCheckTime = -1;
private int numErrors = 0;
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index b1e315b..a3f761f 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -1034,8 +1034,11 @@ public class ExecutorManager extends EventHandler implements
try {
dispatch(reference, exflow, choosenExecutor);
} catch (ExecutorManagerException e) {
- executorLoader.removeActiveExecutableReference(reference
- .getExecId());
+ // When flow dispatch fails, should update the flow status
+ // to FAILED in execution_flows DB table as well. Currently
+ // this logic is only implemented in multiExecutorMode but
+ // missed in single executor case.
+ finalizeFlows(exflow);
throw e;
}
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 4bc8e68..1f31a0a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -1331,27 +1331,23 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
// Select queued unassigned flows
private static String FETCH_QUEUED_EXECUTABLE_FLOW =
- "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, "
- + " ax.update_time axUpdateTime FROM execution_flows ex"
- + " INNER JOIN"
- + " active_executing_flows ax ON ex.exec_id = ax.exec_id"
- + " Where ex.executor_id is NULL";
+ "SELECT exec_id, enc_type, flow_data FROM execution_flows"
+ + " Where executor_id is NULL AND status = "
+ + Status.PREPARING.getNumVal();
@Override
public List<Pair<ExecutionReference, ExecutableFlow>> handle(ResultSet rs)
throws SQLException {
if (!rs.next()) {
- return Collections
- .<Pair<ExecutionReference, ExecutableFlow>> emptyList();
+ return Collections.emptyList();
}
List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
- new ArrayList<Pair<ExecutionReference, ExecutableFlow>>();
+ new ArrayList<>();
do {
int id = rs.getInt(1);
int encodingType = rs.getInt(2);
byte[] data = rs.getBytes(3);
- long updateTime = rs.getLong(4);
if (data == null) {
logger.error("Found a flow with empty data blob exec_id: " + id);
@@ -1373,10 +1369,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
ExecutableFlow exFlow =
ExecutableFlow.createExecutableFlowFromObject(flowObj);
ExecutionReference ref = new ExecutionReference(id);
- ref.setUpdateTime(updateTime);
-
- execFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref,
- exFlow));
+ execFlows.add(new Pair<>(ref, exFlow));
} catch (IOException e) {
throw new SQLException("Error retrieving flow data " + id, e);
}
@@ -1438,19 +1431,20 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
// Select running and executor assigned flows
private static String FETCH_ACTIVE_EXECUTABLE_FLOW =
"SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
- + "et.port port, ax.update_time axUpdateTime, et.id executorId, et.active executorStatus"
+ + "et.port port, et.id executorId, et.active executorStatus"
+ " FROM execution_flows ex"
+ " INNER JOIN "
- + " active_executing_flows ax ON ex.exec_id = ax.exec_id"
- + " INNER JOIN "
- + " executors et ON ex.executor_id = et.id";
+ + " executors et ON ex.executor_id = et.id"
+ + " Where ex.status NOT IN ("
+ + Status.SUCCEEDED.getNumVal() + ", "
+ + Status.KILLED.getNumVal() + ", "
+ + Status.FAILED.getNumVal() + ")";
@Override
public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> handle(
ResultSet rs) throws SQLException {
if (!rs.next()) {
- return Collections
- .<Integer, Pair<ExecutionReference, ExecutableFlow>> emptyMap();
+ return Collections.emptyMap();
}
Map<Integer, Pair<ExecutionReference, ExecutableFlow>> execFlows =
@@ -1461,9 +1455,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
byte[] data = rs.getBytes(3);
String host = rs.getString(4);
int port = rs.getInt(5);
- long updateTime = rs.getLong(6);
- int executorId = rs.getInt(7);
- boolean executorStatus = rs.getBoolean(8);
+ int executorId = rs.getInt(6);
+ boolean executorStatus = rs.getBoolean(7);
if (data == null) {
execFlows.put(id, null);
@@ -1486,10 +1479,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
ExecutableFlow.createExecutableFlowFromObject(flowObj);
Executor executor = new Executor(executorId, host, port, executorStatus);
ExecutionReference ref = new ExecutionReference(id, executor);
- ref.setUpdateTime(updateTime);
-
- execFlows.put(id, new Pair<ExecutionReference, ExecutableFlow>(ref,
- exFlow));
+ execFlows.put(id, new Pair<>(ref, exFlow));
} catch (IOException e) {
throw new SQLException("Error retrieving flow data " + id, e);
}
@@ -1504,13 +1494,14 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
private static String FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID =
"SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
- + "et.port port, ax.update_time axUpdateTime, et.id executorId, et.active executorStatus"
+ + "et.port port, et.id executorId, et.active executorStatus"
+ " FROM execution_flows ex"
+ " INNER JOIN "
- + " active_executing_flows ax ON ex.exec_id = ax.exec_id"
- + " INNER JOIN "
+ " executors et ON ex.executor_id = et.id"
- + " WHERE ax.exec_id = ?";
+ + " Where ex.exec_id = ? AND ex.status NOT IN ("
+ + Status.SUCCEEDED.getNumVal() + ", "
+ + Status.KILLED.getNumVal() + ", "
+ + Status.FAILED.getNumVal() + ")";
@Override
public List<Pair<ExecutionReference, ExecutableFlow>> handle(ResultSet rs)
@@ -1527,9 +1518,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
byte[] data = rs.getBytes(3);
String host = rs.getString(4);
int port = rs.getInt(5);
- long updateTime = rs.getLong(6);
- int executorId = rs.getInt(7);
- boolean executorStatus = rs.getBoolean(8);
+ int executorId = rs.getInt(6);
+ boolean executorStatus = rs.getBoolean(7);
if (data == null) {
logger.error("Found a flow with empty data blob exec_id: " + id);
@@ -1549,8 +1539,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
ExecutableFlow.createExecutableFlowFromObject(flowObj);
Executor executor = new Executor(executorId, host, port, executorStatus);
ExecutionReference ref = new ExecutionReference(id, executor);
- ref.setUpdateTime(updateTime);
-
execFlows.add(new Pair<>(ref, exFlow));
} catch (IOException e) {
throw new SQLException("Error retrieving flow data " + id, e);
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index ea0d43d..6c1138a 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -471,18 +471,17 @@ public class JdbcExecutorLoaderTest {
int port = 12345;
Executor executor = loader.addExecutor(host, port);
+ // When a flow is assigned an executor, it is no longer in queued state
ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow);
loader.assignExecutor(executor.getId(), flow.getExecutionId());
- // only completed flows
Assert.assertTrue(queuedFlows.isEmpty());
+ // When flow status is finished, it is no longer in queued state
ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
- loader.uploadExecutableFlow(flow);
- loader.assignExecutor(executor.getId(), flow.getExecutionId());
- ExecutionReference ref = new ExecutionReference(flow2.getExecutionId());
- loader.addActiveExecutableReference(ref);
- // only running and completed flows
+ loader.uploadExecutableFlow(flow2);
+ flow2.setStatus(Status.SUCCEEDED);
+ loader.updateExecutableFlow(flow2);
Assert.assertTrue(queuedFlows.isEmpty());
}
@@ -495,29 +494,20 @@ public class JdbcExecutorLoaderTest {
}
ExecutorLoader loader = createLoader();
- List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
- new LinkedList<Pair<ExecutionReference, ExecutableFlow>>();
ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow);
ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
loader.uploadExecutableFlow(flow2);
- ExecutionReference ref2 = new ExecutionReference(flow2.getExecutionId());
- loader.addActiveExecutableReference(ref2);
- ExecutionReference ref = new ExecutionReference(flow.getExecutionId());
- loader.addActiveExecutableReference(ref);
-
List<Pair<ExecutionReference, ExecutableFlow>> fetchedQueuedFlows = loader.fetchQueuedFlows();
Assert.assertEquals(2, fetchedQueuedFlows.size());
Pair<ExecutionReference, ExecutableFlow> fetchedFlow1 = fetchedQueuedFlows.get(0);
Pair<ExecutionReference, ExecutableFlow> fetchedFlow2 = fetchedQueuedFlows.get(1);
- Assert.assertEquals(ref.getExecId(), fetchedFlow1.getFirst().getExecId());
Assert.assertEquals(flow.getExecutionId(), fetchedFlow1.getSecond().getExecutionId());
Assert.assertEquals(flow.getFlowId(), fetchedFlow1.getSecond().getFlowId());
Assert.assertEquals(flow.getProjectId(), fetchedFlow1.getSecond().getProjectId());
- Assert.assertEquals(ref2.getExecId(), fetchedFlow2.getFirst().getExecId());
Assert.assertEquals(flow2.getExecutionId(), fetchedFlow2.getSecond().getExecutionId());
Assert.assertEquals(flow2.getFlowId(), fetchedFlow2.getSecond().getFlowId());
Assert.assertEquals(flow2.getProjectId(), fetchedFlow2.getSecond().getProjectId());
@@ -785,32 +775,28 @@ public class JdbcExecutorLoaderTest {
}
@Test
- public void testActiveReference() throws Exception {
+ public void testFetchActiveFlowsExecutorAssigned() throws Exception {
if (!isTestSetup()) {
return;
}
+ // Upload flow1, executor assigned
ExecutorLoader loader = createLoader();
ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow1);
Executor executor = loader.addExecutor("test", 1);
loader.assignExecutor(executor.getId(), flow1.getExecutionId());
- ExecutionReference ref1 =
- new ExecutionReference(flow1.getExecutionId(), executor);
- loader.addActiveExecutableReference(ref1);
- ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ // Upload flow2, executor not assigned
+ ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
loader.uploadExecutableFlow(flow2);
- loader.assignExecutor(executor.getId(), flow2.getExecutionId());
- ExecutionReference ref2 =
- new ExecutionReference(flow2.getExecutionId(), executor);
- loader.addActiveExecutableReference(ref2);
-
- ExecutableFlow flow3 = TestUtils.createExecutableFlow("exectest1", "exec1");
- loader.uploadExecutableFlow(flow3);
Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
loader.fetchActiveFlows();
+
+ Assert.assertTrue(activeFlows1.containsKey(flow1.getExecutionId()));
+ Assert.assertFalse(activeFlows1.containsKey(flow2.getExecutionId()));
+
ExecutableFlow flow1Result =
activeFlows1.get(flow1.getExecutionId()).getSecond();
Assert.assertNotNull(flow1Result);
@@ -824,28 +810,64 @@ public class JdbcExecutorLoaderTest {
Assert.assertEquals(flow1.getVersion(), flow1Result.getVersion());
Assert.assertEquals(flow1.getExecutionOptions().getFailureAction(),
flow1Result.getExecutionOptions().getFailureAction());
+ }
+
+ @Test
+ public void testFetchActiveFlowsStatusChanged() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+
+ ExecutorLoader loader = createLoader();
+ ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ // Flow status is PREPARING when uploaded, should be in active flows
+ loader.uploadExecutableFlow(flow1);
+ Executor executor = loader.addExecutor("test", 1);
+ loader.assignExecutor(executor.getId(), flow1.getExecutionId());
+
+ Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows =
+ loader.fetchActiveFlows();
+ Assert.assertTrue(activeFlows.containsKey(flow1.getExecutionId()));
+
+ // When flow status becomes SUCCEEDED/KILLED/FAILED, it should not be in active state
+ flow1.setStatus(Status.SUCCEEDED);
+ loader.updateExecutableFlow(flow1);
+ activeFlows = loader.fetchActiveFlows();
+ Assert.assertFalse(activeFlows.containsKey(flow1.getExecutionId()));
- ExecutableFlow flow1Result2 =
- activeFlows1.get(flow2.getExecutionId()).getSecond();
- Assert.assertNotNull(flow1Result2);
- Assert.assertTrue(flow2 != flow1Result2);
- Assert.assertEquals(flow2.getExecutionId(), flow1Result2.getExecutionId());
- Assert.assertEquals(flow2.getEndTime(), flow1Result2.getEndTime());
- Assert.assertEquals(flow2.getStartTime(), flow1Result2.getStartTime());
- Assert.assertEquals(flow2.getSubmitTime(), flow1Result2.getSubmitTime());
- Assert.assertEquals(flow2.getFlowId(), flow1Result2.getFlowId());
- Assert.assertEquals(flow2.getProjectId(), flow1Result2.getProjectId());
- Assert.assertEquals(flow2.getVersion(), flow1Result2.getVersion());
- Assert.assertEquals(flow2.getExecutionOptions().getFailureAction(),
- flow1Result2.getExecutionOptions().getFailureAction());
-
- loader.removeActiveExecutableReference(flow2.getExecutionId());
- Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows2 =
+ flow1.setStatus(Status.KILLED);
+ loader.updateExecutableFlow(flow1);
+ activeFlows = loader.fetchActiveFlows();
+ Assert.assertFalse(activeFlows.containsKey(flow1.getExecutionId()));
+
+ flow1.setStatus(Status.FAILED);
+ loader.updateExecutableFlow(flow1);
+ activeFlows = loader.fetchActiveFlows();
+ Assert.assertFalse(activeFlows.containsKey(flow1.getExecutionId()));
+ }
+
+ @Test
+ public void testFetchActiveFlowsReferenceChanged() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+
+ ExecutorLoader loader = createLoader();
+ ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ loader.uploadExecutableFlow(flow1);
+ Executor executor = loader.addExecutor("test", 1);
+ loader.assignExecutor(executor.getId(), flow1.getExecutionId());
+ ExecutionReference ref1 =
+ new ExecutionReference(flow1.getExecutionId(), executor);
+ loader.addActiveExecutableReference(ref1);
+
+ Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
loader.fetchActiveFlows();
+ Assert.assertTrue(activeFlows1.containsKey(flow1.getExecutionId()));
- Assert.assertTrue(activeFlows2.containsKey(flow1.getExecutionId()));
- Assert.assertFalse(activeFlows2.containsKey(flow3.getExecutionId()));
- Assert.assertFalse(activeFlows2.containsKey(flow2.getExecutionId()));
+ // Verify active flows are not fetched from active_executing_flows DB table any more
+ loader.removeActiveExecutableReference(flow1.getExecutionId());
+ Assert.assertTrue(activeFlows1.containsKey(flow1.getExecutionId()));
}
@Test
@@ -859,8 +881,6 @@ public class JdbcExecutorLoaderTest {
loader.uploadExecutableFlow(flow1);
Executor executor = loader.addExecutor("test", 1);
loader.assignExecutor(executor.getId(), flow1.getExecutionId());
- ExecutionReference ref1 = new ExecutionReference(flow1.getExecutionId(), executor);
- loader.addActiveExecutableReference(ref1);
Pair<ExecutionReference, ExecutableFlow> activeFlow1 =
loader.fetchActiveFlowByExecId(flow1.getExecutionId());
@@ -868,8 +888,6 @@ public class JdbcExecutorLoaderTest {
ExecutionReference execRef1 = activeFlow1.getFirst();
ExecutableFlow execFlow1 = activeFlow1.getSecond();
Assert.assertNotNull(execRef1);
- Assert.assertEquals(ref1.getExecId(), execRef1.getExecId());
- Assert.assertEquals(ref1.getExecutor(), execRef1.getExecutor());
Assert.assertNotNull(execFlow1);
Assert.assertEquals(flow1.getExecutionId(), execFlow1.getExecutionId());
Assert.assertEquals(flow1.getFlowId(), execFlow1.getFlowId());