diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index bf87720..75845a5 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -200,26 +200,42 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
}
@Override
- public List<ExecutableFlow> getExecutableFlows(Project project, String flowId, int skip, int size) throws ExecutorManagerException {
- List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
+ public List<ExecutableFlow> getExecutableFlows(
+ Project project, String flowId, int skip, int size)
+ throws ExecutorManagerException {
+ List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
+ project.getId(), flowId, skip, size);
return flows;
}
@Override
- public List<ExecutableFlow> getExecutableFlows(int skip, int size) throws ExecutorManagerException {
+ public List<ExecutableFlow> getExecutableFlows(int skip, int size)
+ throws ExecutorManagerException {
List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(skip, size);
return flows;
}
@Override
- public List<ExecutableFlow> getExecutableFlows(String flowIdContains, int skip, int size) throws ExecutorManagerException {
- List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(null, '%'+flowIdContains+'%', null, 0, -1, -1 , skip, size);
+ public List<ExecutableFlow> getExecutableFlows(
+ String flowIdContains, int skip, int size)
+ throws ExecutorManagerException {
+ List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
+ null, '%'+flowIdContains+'%', null, 0, -1, -1 , skip, size);
return flows;
}
@Override
- public List<ExecutableFlow> getExecutableFlows(String projContain, String flowContain, String userContain, int status, long begin, long end, int skip, int size) throws ExecutorManagerException {
- List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(projContain, flowContain, userContain, status, begin, end , skip, size);
+ public List<ExecutableFlow> getExecutableFlows(
+ String projContain,
+ String flowContain,
+ String userContain,
+ int status,
+ long begin,
+ long end,
+ int skip,
+ int size) throws ExecutorManagerException {
+ List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
+ projContain, flowContain, userContain, status, begin, end , skip, size);
return flows;
}
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 5373b7d..6d44b57 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -180,12 +180,14 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
}
@Override
- public int fetchNumExecutableFlows(int projectId, String flowId) throws ExecutorManagerException {
+ public int fetchNumExecutableFlows(int projectId, String flowId)
+ throws ExecutorManagerException {
QueryRunner runner = createQueryRunner();
IntHandler intHandler = new IntHandler();
try {
- int count = runner.query(IntHandler.NUM_FLOW_EXECUTIONS, intHandler, projectId, flowId);
+ int count = runner.query(
+ IntHandler.NUM_FLOW_EXECUTIONS, intHandler, projectId, flowId);
return count;
} catch (SQLException e) {
throw new ExecutorManagerException("Error fetching num executions", e);
@@ -193,12 +195,14 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
}
@Override
- public int fetchNumExecutableNodes(int projectId, String jobId) throws ExecutorManagerException {
+ public int fetchNumExecutableNodes(int projectId, String jobId)
+ throws ExecutorManagerException {
QueryRunner runner = createQueryRunner();
IntHandler intHandler = new IntHandler();
try {
- int count = runner.query(IntHandler.NUM_JOB_EXECUTIONS, intHandler, projectId, jobId);
+ int count = runner.query(
+ IntHandler.NUM_JOB_EXECUTIONS, intHandler, projectId, jobId);
return count;
} catch (SQLException e) {
throw new ExecutorManagerException("Error fetching num executions", e);
@@ -206,12 +210,19 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
}
@Override
- public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num) throws ExecutorManagerException {
+ public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId,
+ int skip, int num) throws ExecutorManagerException {
QueryRunner runner = createQueryRunner();
FetchExecutableFlows flowHandler = new FetchExecutableFlows();
try {
- List<ExecutableFlow> properties = runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_HISTORY, flowHandler, projectId, flowId, skip, num);
+ List<ExecutableFlow> properties = runner.query(
+ FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_HISTORY,
+ flowHandler,
+ projectId,
+ flowId,
+ skip,
+ num);
return properties;
} catch (SQLException e) {
throw new ExecutorManagerException("Error fetching active flows", e);
@@ -219,12 +230,21 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
}
@Override
- public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num, Status status) throws ExecutorManagerException {
+ public List<ExecutableFlow> fetchFlowHistory(
+ int projectId, String flowId, int skip, int num, Status status)
+ throws ExecutorManagerException {
QueryRunner runner = createQueryRunner();
FetchExecutableFlows flowHandler = new FetchExecutableFlows();
try {
- List<ExecutableFlow> properties = runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_STATUS, flowHandler, projectId, flowId, status.getNumVal(), skip, num);
+ List<ExecutableFlow> properties = runner.query(
+ FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_STATUS,
+ flowHandler,
+ projectId,
+ flowId,
+ status.getNumVal(),
+ skip,
+ num);
return properties;
} catch (SQLException e) {
throw new ExecutorManagerException("Error fetching active flows", e);
@@ -232,13 +252,18 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
}
@Override
- public List<ExecutableFlow> fetchFlowHistory(int skip, int num) throws ExecutorManagerException {
+ public List<ExecutableFlow> fetchFlowHistory(int skip, int num)
+ throws ExecutorManagerException {
QueryRunner runner = createQueryRunner();
FetchExecutableFlows flowHandler = new FetchExecutableFlows();
try {
- List<ExecutableFlow> properties = runner.query(FetchExecutableFlows.FETCH_ALL_EXECUTABLE_FLOW_HISTORY, flowHandler, skip, num);
+ List<ExecutableFlow> properties = runner.query(
+ FetchExecutableFlows.FETCH_ALL_EXECUTABLE_FLOW_HISTORY,
+ flowHandler,
+ skip,
+ num);
return properties;
} catch (SQLException e) {
throw new ExecutorManagerException("Error fetching active flows", e);
@@ -247,7 +272,15 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
@Override
- public List<ExecutableFlow> fetchFlowHistory(String projContain, String flowContains, String userNameContains, int status, long startTime, long endTime, int skip, int num) throws ExecutorManagerException {
+ public List<ExecutableFlow> fetchFlowHistory(
+ String projContain,
+ String flowContains,
+ String userNameContains,
+ int status,
+ long startTime,
+ long endTime,
+ int skip,
+ int num) throws ExecutorManagerException {
String query = FetchExecutableFlows.FETCH_BASE_EXECUTABLE_FLOW_QUERY;
ArrayList<Object> params = new ArrayList<Object>();
@@ -329,7 +362,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
FetchExecutableFlows flowHandler = new FetchExecutableFlows();
try {
- List<ExecutableFlow> properties = runner.query(query, flowHandler, params.toArray());
+ List<ExecutableFlow> properties = runner.query(
+ query, flowHandler, params.toArray());
return properties;
} catch (SQLException e) {
throw new ExecutorManagerException("Error fetching active flows", e);
@@ -839,13 +873,28 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
}
- private static class FetchExecutableFlows implements ResultSetHandler<List<ExecutableFlow>> {
- private static String FETCH_BASE_EXECUTABLE_FLOW_QUERY = "SELECT exec_id, enc_type, flow_data FROM execution_flows ";
- private static String FETCH_EXECUTABLE_FLOW = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE exec_id=?";
- //private static String FETCH_ACTIVE_EXECUTABLE_FLOW = "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data FROM execution_flows ex INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";
- private static String FETCH_ALL_EXECUTABLE_FLOW_HISTORY = "SELECT exec_id, enc_type, flow_data FROM execution_flows ORDER BY exec_id DESC LIMIT ?, ?";
- private static String FETCH_EXECUTABLE_FLOW_HISTORY = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE project_id=? AND flow_id=? ORDER BY exec_id DESC LIMIT ?, ?";
- private static String FETCH_EXECUTABLE_FLOW_BY_STATUS = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE project_id=? AND flow_id=? AND status=? ORDER BY exec_id DESC LIMIT ?, ?";
+ private static class FetchExecutableFlows
+ implements ResultSetHandler<List<ExecutableFlow>> {
+ private static String FETCH_BASE_EXECUTABLE_FLOW_QUERY =
+ "SELECT exec_id, enc_type, flow_data FROM execution_flows ";
+ private static String FETCH_EXECUTABLE_FLOW =
+ "SELECT exec_id, enc_type, flow_data FROM execution_flows " +
+ "WHERE exec_id=?";
+ //private static String FETCH_ACTIVE_EXECUTABLE_FLOW =
+ // "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data " +
+ // "FROM execution_flows ex " +
+ // "INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";
+ private static String FETCH_ALL_EXECUTABLE_FLOW_HISTORY =
+ "SELECT exec_id, enc_type, flow_data FROM execution_flows " +
+ "ORDER BY exec_id DESC LIMIT ?, ?";
+ private static String FETCH_EXECUTABLE_FLOW_HISTORY =
+ "SELECT exec_id, enc_type, flow_data FROM execution_flows " +
+ "WHERE project_id=? AND flow_id=? " +
+ "ORDER BY exec_id DESC LIMIT ?, ?";
+ private static String FETCH_EXECUTABLE_FLOW_BY_STATUS =
+ "SELECT exec_id, enc_type, flow_data FROM execution_flows " +
+ "WHERE project_id=? AND flow_id=? AND status=? " +
+ "ORDER BY exec_id DESC LIMIT ?, ?";
@Override
public List<ExecutableFlow> handle(ResultSet rs) throws SQLException {
@@ -863,7 +912,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
EncodingType encType = EncodingType.fromInteger(encodingType);
Object flowObj;
try {
- // Convoluted way to inflate strings. Should find common package or helper function.
+ // Convoluted way to inflate strings. Should find common package
+ // or helper function.
if (encType == EncodingType.GZIP) {
// Decompress the sucker.
String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
@@ -874,7 +924,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
flowObj = JSONUtils.parseJSONFromString(jsonString);
}
- ExecutableFlow exFlow = ExecutableFlow.createExecutableFlowFromObject(flowObj);
+ ExecutableFlow exFlow =
+ ExecutableFlow.createExecutableFlowFromObject(flowObj);
execFlows.add(exFlow);
} catch (IOException e) {
throw new SQLException("Error retrieving flow data " + id, e);