diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
index 1d49c98..26046b2 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
@@ -21,9 +21,11 @@ import azkaban.db.DatabaseOperator;
import azkaban.db.SQLTransaction;
import azkaban.utils.GZIPUtils;
import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -96,6 +98,16 @@ public class ExecutionFlowDao {
}
}
+ public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
+ throws ExecutorManagerException {
+ try {
+ return this.dbOperator.query(FetchQueuedExecutableFlows.FETCH_QUEUED_EXECUTABLE_FLOW,
+ new FetchQueuedExecutableFlows());
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error fetching active flows", e);
+ }
+ }
+
List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
final int skip, final int num,
final Status status)
@@ -108,6 +120,18 @@ public class ExecutionFlowDao {
}
}
+ List<ExecutableFlow> fetchRecentlyFinishedFlows(final Duration maxAge)
+ throws ExecutorManagerException {
+ try {
+ return this.dbOperator.query(FetchRecentlyFinishedFlows.FETCH_RECENTLY_FINISHED_FLOW,
+ new FetchRecentlyFinishedFlows(), System.currentTimeMillis() - maxAge.toMillis(),
+ Status.SUCCEEDED.getNumVal(), Status.KILLED.getNumVal(),
+ Status.FAILED.getNumVal());
+ } catch (final SQLException e) {
+ throw new ExecutorManagerException("Error fetching recently finished flows", e);
+ }
+ }
+
List<ExecutableFlow> fetchFlowHistory(final String projContain, final String flowContains,
final String userNameContains, final int status,
final long startTime, final long endTime,
@@ -304,4 +328,105 @@ public class ExecutionFlowDao {
return execFlows;
}
}
+
+ /**
+ * JDBC ResultSetHandler to fetch queued executions
+ */
+ private static class FetchQueuedExecutableFlows implements
+ ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
+ // Select queued unassigned flows
+ private static final String FETCH_QUEUED_EXECUTABLE_FLOW =
+ "SELECT exec_id, enc_type, flow_data FROM execution_flows"
+ + " Where executor_id is NULL AND status = "
+ + Status.PREPARING.getNumVal();
+
+ @Override
+ public List<Pair<ExecutionReference, ExecutableFlow>> handle(final ResultSet rs)
+ throws SQLException {
+ if (!rs.next()) {
+ return Collections.emptyList();
+ }
+
+ final List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
+ new ArrayList<>();
+ do {
+ final int id = rs.getInt(1);
+ final int encodingType = rs.getInt(2);
+ final byte[] data = rs.getBytes(3);
+
+ if (data == null) {
+ logger.error("Found a flow with empty data blob exec_id: " + id);
+ } else {
+ final EncodingType encType = EncodingType.fromInteger(encodingType);
+ final Object flowObj;
+ try {
+ // Convoluted way to inflate strings. Should find common package or
+ // helper function.
+ if (encType == EncodingType.GZIP) {
+ // Decompress the sucker.
+ final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+ flowObj = JSONUtils.parseJSONFromString(jsonString);
+ } else {
+ final String jsonString = new String(data, "UTF-8");
+ flowObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+
+ final ExecutableFlow exFlow =
+ ExecutableFlow.createExecutableFlowFromObject(flowObj);
+ final ExecutionReference ref = new ExecutionReference(id);
+ execFlows.add(new Pair<>(ref, exFlow));
+ } catch (final IOException e) {
+ throw new SQLException("Error retrieving flow data " + id, e);
+ }
+ }
+ } while (rs.next());
+
+ return execFlows;
+ }
+ }
+
+ private static class FetchRecentlyFinishedFlows implements
+ ResultSetHandler<List<ExecutableFlow>> {
+ // Execution_flows table is already indexed by end_time
+ private static final String FETCH_RECENTLY_FINISHED_FLOW =
+ "SELECT exec_id, enc_type, flow_data FROM execution_flows "
+ + "WHERE end_time > ? AND status IN (?, ?, ?)";
+
+ @Override
+ public List<ExecutableFlow> handle(
+ final ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.emptyList();
+ }
+
+ final List<ExecutableFlow> execFlows = new ArrayList<>();
+ do {
+ final int id = rs.getInt(1);
+ final int encodingType = rs.getInt(2);
+ final byte[] data = rs.getBytes(3);
+
+ if (data != null) {
+ final EncodingType encType = EncodingType.fromInteger(encodingType);
+ final Object flowObj;
+ try {
+ if (encType == EncodingType.GZIP) {
+ final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+ flowObj = JSONUtils.parseJSONFromString(jsonString);
+ } else {
+ final String jsonString = new String(data, "UTF-8");
+ flowObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+
+ final ExecutableFlow exFlow =
+ ExecutableFlow.createExecutableFlowFromObject(flowObj);
+
+ execFlows.add(exFlow);
+ } catch (final IOException e) {
+ throw new SQLException("Error retrieving flow data " + id, e);
+ }
+ }
+ } while (rs.next());
+ return execFlows;
+ }
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 3856122..162c34e 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -20,8 +20,6 @@ import azkaban.database.AbstractJdbcLoader;
import azkaban.executor.ExecutorLogEvent.EventType;
import azkaban.metrics.CommonMetrics;
import azkaban.utils.FileIOUtils.LogData;
-import azkaban.utils.GZIPUtils;
-import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import com.google.inject.Inject;
@@ -32,8 +30,6 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.dbutils.DbUtils;
@@ -100,25 +96,10 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
return this.executionFlowDao.fetchExecutableFlow(id);
}
- /**
- *
- * {@inheritDoc}
- * @see azkaban.executor.ExecutorLoader#fetchQueuedFlows()
- */
- @Override
+ @Override
public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- final FetchQueuedExecutableFlows flowHandler = new FetchQueuedExecutableFlows();
-
- try {
- final List<Pair<ExecutionReference, ExecutableFlow>> flows =
- runner.query(FetchQueuedExecutableFlows.FETCH_QUEUED_EXECUTABLE_FLOW,
- flowHandler);
- return flows;
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error fetching active flows", e);
- }
+ return this.executionFlowDao.fetchQueuedFlows();
}
/**
@@ -127,19 +108,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public List<ExecutableFlow> fetchRecentlyFinishedFlows(final Duration maxAge)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- final FetchRecentlyFinishedFlows flowHandler = new FetchRecentlyFinishedFlows();
-
- try {
- final List<ExecutableFlow> flows =
- runner.query(FetchRecentlyFinishedFlows.FETCH_RECENTLY_FINISHED_FLOW,
- flowHandler, System.currentTimeMillis() - maxAge.toMillis(),
- Status.SUCCEEDED.getNumVal(), Status.KILLED.getNumVal(),
- Status.FAILED.getNumVal());
- return flows;
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error fetching recently finished flows", e);
- }
+ return this.executionFlowDao.fetchRecentlyFinishedFlows(maxAge);
}
@Override
@@ -347,55 +316,28 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
return connection;
}
-
- /**
- *
- * {@inheritDoc}
- *
- * @see azkaban.executor.ExecutorLoader#fetchActiveExecutors()
- */
- @Override
+ @Override
public List<Executor> fetchAllExecutors() throws ExecutorManagerException {
return this.executorDao.fetchAllExecutors();
}
- /**
- *
- * {@inheritDoc}
- *
- * @see azkaban.executor.ExecutorLoader#fetchActiveExecutors()
- */
- @Override
+ @Override
public List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
return this.executorDao.fetchActiveExecutors();
}
- /**
- * {@inheritDoc}
- *
- * @see azkaban.executor.ExecutorLoader#fetchExecutor(java.lang.String, int)
- */
- @Override
+ @Override
public Executor fetchExecutor(final String host, final int port)
throws ExecutorManagerException {
return this.executorDao.fetchExecutor(host, port);
}
- /**
- * {@inheritDoc}
- *
- * @see azkaban.executor.ExecutorLoader#fetchExecutor(int)
- */
- @Override
+ @Override
public Executor fetchExecutor(final int executorId) throws ExecutorManagerException {
return this.executorDao.fetchExecutor(executorId);
}
- /**
- * {@inheritDoc}
- *
- */
- @Override
+ @Override
public void updateExecutor(final Executor executor) throws ExecutorManagerException {
final String UPDATE =
"UPDATE executors SET host=?, port=?, active=? where id=?";
@@ -415,35 +357,17 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
}
- /**
- * {@inheritDoc}
- *
- * @see azkaban.executor.ExecutorLoader#addExecutor(java.lang.String, int)
- */
@Override
public Executor addExecutor(final String host, final int port)
throws ExecutorManagerException {
return this.executorDao.addExecutor(host, port);
}
-
- /**
- * {@inheritDoc}
- *
- * @see azkaban.executor.ExecutorLoader#removeExecutor(String, int)
- */
- @Override
+ @Override
public void removeExecutor(final String host, final int port) throws ExecutorManagerException {
this.executorDao.removeExecutor(host, port);
}
- /**
- * {@inheritDoc}
- *
- * @see azkaban.executor.ExecutorLoader#postExecutorEvent(azkaban.executor.Executor,
- * azkaban.executor.ExecutorLogEvent.EventType, java.lang.String,
- * java.lang.String)
- */
@Override
public void postExecutorEvent(final Executor executor, final EventType type, final String user,
final String message) throws ExecutorManagerException{
@@ -451,12 +375,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
this.executorEventsDao.postExecutorEvent(executor, type, user, message);
}
- /**
- * {@inheritDoc}
- *
- * @see azkaban.executor.ExecutorLoader#getExecutorEvents(azkaban.executor.Executor,
- * int, int)
- */
@Override
public List<ExecutorLogEvent> getExecutorEvents(final Executor executor, final int num,
final int offset) throws ExecutorManagerException {
@@ -464,12 +382,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
return this.executorEventsDao.getExecutorEvents(executor, num, offset);
}
- /**
- *
- * {@inheritDoc}
- *
- * @see azkaban.executor.ExecutorLoader#assignExecutor(int, int)
- */
@Override
public void assignExecutor(final int executorId, final int executionId)
throws ExecutorManagerException {
@@ -497,12 +409,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
}
- /**
- *
- * {@inheritDoc}
- *
- * @see azkaban.executor.ExecutorLoader#fetchExecutorByExecutionId(int)
- */
@Override
public Executor fetchExecutorByExecutionId(final int executionId)
throws ExecutorManagerException {
@@ -516,11 +422,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
return this.executionLogsDao.removeExecutionLogsByTime(millis);
}
- /**
- *
- * {@inheritDoc}
- * @see azkaban.executor.ExecutorLoader#unassignExecutor(int)
- */
@Override
public void unassignExecutor(final int executionId) throws ExecutorManagerException {
final String UPDATE =
@@ -539,121 +440,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
}
- private static class LastInsertID implements ResultSetHandler<Long> {
- private static final String LAST_INSERT_ID = "SELECT LAST_INSERT_ID()";
-
- @Override
- public Long handle(final ResultSet rs) throws SQLException {
- if (!rs.next()) {
- return -1L;
- }
- final long id = rs.getLong(1);
- return id;
- }
- }
-
- /**
- * JDBC ResultSetHandler to fetch queued executions
- */
- private static class FetchQueuedExecutableFlows implements
- ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
- // Select queued unassigned flows
- private static final String FETCH_QUEUED_EXECUTABLE_FLOW =
- "SELECT exec_id, enc_type, flow_data FROM execution_flows"
- + " Where executor_id is NULL AND status = "
- + Status.PREPARING.getNumVal();
-
- @Override
- public List<Pair<ExecutionReference, ExecutableFlow>> handle(final ResultSet rs)
- throws SQLException {
- if (!rs.next()) {
- return Collections.emptyList();
- }
-
- final List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
- new ArrayList<>();
- do {
- final int id = rs.getInt(1);
- final int encodingType = rs.getInt(2);
- final byte[] data = rs.getBytes(3);
-
- if (data == null) {
- logger.error("Found a flow with empty data blob exec_id: " + id);
- } else {
- final EncodingType encType = EncodingType.fromInteger(encodingType);
- final Object flowObj;
- try {
- // Convoluted way to inflate strings. Should find common package or
- // helper function.
- if (encType == EncodingType.GZIP) {
- // Decompress the sucker.
- final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
- flowObj = JSONUtils.parseJSONFromString(jsonString);
- } else {
- final String jsonString = new String(data, "UTF-8");
- flowObj = JSONUtils.parseJSONFromString(jsonString);
- }
-
- final ExecutableFlow exFlow =
- ExecutableFlow.createExecutableFlowFromObject(flowObj);
- final ExecutionReference ref = new ExecutionReference(id);
- execFlows.add(new Pair<>(ref, exFlow));
- } catch (final IOException e) {
- throw new SQLException("Error retrieving flow data " + id, e);
- }
- }
- } while (rs.next());
-
- return execFlows;
- }
- }
-
- private static class FetchRecentlyFinishedFlows implements
- ResultSetHandler<List<ExecutableFlow>> {
- // Execution_flows table is already indexed by end_time
- private static final String FETCH_RECENTLY_FINISHED_FLOW =
- "SELECT exec_id, enc_type, flow_data FROM execution_flows "
- + "WHERE end_time > ? AND status IN (?, ?, ?)";
-
- @Override
- public List<ExecutableFlow> handle(
- final ResultSet rs) throws SQLException {
- if (!rs.next()) {
- return Collections.emptyList();
- }
-
- final List<ExecutableFlow> execFlows = new ArrayList<>();
- do {
- final int id = rs.getInt(1);
- final int encodingType = rs.getInt(2);
- final byte[] data = rs.getBytes(3);
-
- if (data != null) {
- final EncodingType encType = EncodingType.fromInteger(encodingType);
- final Object flowObj;
- try {
- if (encType == EncodingType.GZIP) {
- final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
- flowObj = JSONUtils.parseJSONFromString(jsonString);
- } else {
- final String jsonString = new String(data, "UTF-8");
- flowObj = JSONUtils.parseJSONFromString(jsonString);
- }
-
- final ExecutableFlow exFlow =
- ExecutableFlow.createExecutableFlowFromObject(flowObj);
-
- execFlows.add(exFlow);
- } catch (final IOException e) {
- throw new SQLException("Error retrieving flow data " + id, e);
- }
- }
- } while (rs.next());
-
- return execFlows;
- }
- }
-
private static class IntHandler implements ResultSetHandler<Integer> {
private static final String NUM_EXECUTIONS =
"SELECT COUNT(1) FROM execution_flows";
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index 5d2d6f3..d1f1263 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -20,10 +20,13 @@ import static org.assertj.core.api.Assertions.assertThat;
import azkaban.db.DatabaseOperator;
import azkaban.test.Utils;
+import azkaban.utils.Pair;
import azkaban.utils.TestUtils;
import java.sql.SQLException;
+import java.time.Duration;
import java.util.HashSet;
import java.util.List;
+import org.joda.time.DateTimeUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -32,6 +35,9 @@ import org.junit.Test;
public class ExecutionFlowDaoTest {
+ private static final Duration RECENTLY_FINISHED_LIFETIME = Duration.ofMinutes(1);
+ private static final Duration FLOW_FINISHED_TIME = Duration.ofMinutes(2);
+
private static DatabaseOperator dbOperator;
private ExecutionFlowDao executionFlowDao;
@@ -115,6 +121,60 @@ public class ExecutionFlowDaoTest {
assertTwoFlowSame(flowList1.get(0), fetchFlow);
}
+ @Test
+ public void testFetchRecentlyFinishedFlows() throws Exception {
+ final ExecutableFlow flow1 = createTestFlow();
+ this.executionFlowDao.uploadExecutableFlow(flow1);
+ flow1.setStatus(Status.SUCCEEDED);
+ flow1.setEndTime(System.currentTimeMillis());
+ this.executionFlowDao.updateExecutableFlow(flow1);
+
+ //Flow just finished. Fetch recently finished flows immediately. Should get it.
+ final List<ExecutableFlow> flows = this.executionFlowDao.fetchRecentlyFinishedFlows(
+ RECENTLY_FINISHED_LIFETIME);
+ assertThat(flows.size()).isEqualTo(1);
+ assertTwoFlowSame(flow1, flows.get(0));
+ }
+
+ @Test
+ public void testFetchEmptyRecentlyFinishedFlows() throws Exception {
+ final ExecutableFlow flow1 = createTestFlow();
+ this.executionFlowDao.uploadExecutableFlow(flow1);
+ flow1.setStatus(Status.SUCCEEDED);
+ flow1.setEndTime(DateTimeUtils.currentTimeMillis());
+ this.executionFlowDao.updateExecutableFlow(flow1);
+ //Todo jamiesjc: use java8.java.time api instead of jodatime
+
+ //Mock flow finished time to be 2 min ago.
+ DateTimeUtils.setCurrentMillisOffset(-FLOW_FINISHED_TIME.toMillis());
+ flow1.setEndTime(DateTimeUtils.currentTimeMillis());
+ this.executionFlowDao.updateExecutableFlow(flow1);
+
+ //Fetch recently finished flows within 1 min. Should be empty.
+ final List<ExecutableFlow> flows = this.executionFlowDao
+ .fetchRecentlyFinishedFlows(RECENTLY_FINISHED_LIFETIME);
+ assertThat(flows.size()).isEqualTo(0);
+ }
+
+ @Test
+ public void testFetchQueuedFlows() throws Exception {
+
+ final ExecutableFlow flow = createTestFlow();
+ flow.setStatus(Status.PREPARING);
+ this.executionFlowDao.uploadExecutableFlow(flow);
+ final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+ flow2.setStatus(Status.PREPARING);
+ this.executionFlowDao.uploadExecutableFlow(flow2);
+
+ final List<Pair<ExecutionReference, ExecutableFlow>> fetchedQueuedFlows = this.executionFlowDao.fetchQueuedFlows();
+ assertThat(fetchedQueuedFlows.size()).isEqualTo(2);
+ final Pair<ExecutionReference, ExecutableFlow> fetchedFlow1 = fetchedQueuedFlows.get(0);
+ final Pair<ExecutionReference, ExecutableFlow> fetchedFlow2 = fetchedQueuedFlows.get(1);
+
+ assertTwoFlowSame(flow, fetchedFlow1.getSecond());
+ assertTwoFlowSame(flow2, fetchedFlow2.getSecond());
+ }
+
private void assertTwoFlowSame(final ExecutableFlow flow1, final ExecutableFlow flow2) {
assertThat(flow1.getExecutionId()).isEqualTo(flow2.getExecutionId());
assertThat(flow1.getStatus()).isEqualTo(flow2.getStatus());