azkaban-memoizeit

Details

diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index bf87720..75845a5 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -200,26 +200,42 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
 	}
 	
 	@Override
-	public List<ExecutableFlow> getExecutableFlows(Project project, String flowId, int skip, int size) throws ExecutorManagerException {
-		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
+	public List<ExecutableFlow> getExecutableFlows(
+      Project project, String flowId, int skip, int size) 
+      throws ExecutorManagerException {
+		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
+        project.getId(), flowId, skip, size);
 		return flows;
 	}
 	
 	@Override
-	public List<ExecutableFlow> getExecutableFlows(int skip, int size) throws ExecutorManagerException {
+	public List<ExecutableFlow> getExecutableFlows(int skip, int size) 
+      throws ExecutorManagerException {
 		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(skip, size);
 		return flows;
 	}
 	
 	@Override
-	public List<ExecutableFlow> getExecutableFlows(String flowIdContains, int skip, int size) throws ExecutorManagerException {
-		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(null, '%'+flowIdContains+'%', null, 0, -1, -1 , skip, size);
+	public List<ExecutableFlow> getExecutableFlows(
+      String flowIdContains, int skip, int size) 
+      throws ExecutorManagerException {
+		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
+        null, '%'+flowIdContains+'%', null, 0, -1, -1 , skip, size);
 		return flows;
 	}
 
 	@Override
-	public List<ExecutableFlow> getExecutableFlows(String projContain, String flowContain, String userContain, int status, long begin, long end, int skip, int size) throws ExecutorManagerException {
-		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(projContain, flowContain, userContain, status, begin, end , skip, size);
+	public List<ExecutableFlow> getExecutableFlows(
+      String projContain, 
+      String flowContain, 
+      String userContain, 
+      int status, 
+      long begin, 
+      long end, 
+      int skip, 
+      int size) throws ExecutorManagerException {
+		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
+        projContain, flowContain, userContain, status, begin, end , skip, size);
 		return flows;
 	}
 	
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 5373b7d..6d44b57 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -180,12 +180,14 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
 	}
 	
 	@Override
-	public int fetchNumExecutableFlows(int projectId, String flowId) throws ExecutorManagerException {
+	public int fetchNumExecutableFlows(int projectId, String flowId) 
+			throws ExecutorManagerException {
 		QueryRunner runner = createQueryRunner();
 		
 		IntHandler intHandler = new IntHandler();
 		try {
-			int count = runner.query(IntHandler.NUM_FLOW_EXECUTIONS, intHandler, projectId, flowId);
+			int count = runner.query(
+					IntHandler.NUM_FLOW_EXECUTIONS, intHandler, projectId, flowId);
 			return count;
 		} catch (SQLException e) {
 			throw new ExecutorManagerException("Error fetching num executions", e);
@@ -193,12 +195,14 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
 	}
 	
 	@Override
-	public int fetchNumExecutableNodes(int projectId, String jobId) throws ExecutorManagerException {
+	public int fetchNumExecutableNodes(int projectId, String jobId) 
+			throws ExecutorManagerException {
 		QueryRunner runner = createQueryRunner();
 		
 		IntHandler intHandler = new IntHandler();
 		try {
-			int count = runner.query(IntHandler.NUM_JOB_EXECUTIONS, intHandler, projectId, jobId);
+			int count = runner.query(
+					IntHandler.NUM_JOB_EXECUTIONS, intHandler, projectId, jobId);
 			return count;
 		} catch (SQLException e) {
 			throw new ExecutorManagerException("Error fetching num executions", e);
@@ -206,12 +210,19 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
 	}
 	
 	@Override
-	public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num) throws ExecutorManagerException {
+	public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, 
+			int skip, int num) throws ExecutorManagerException {
 		QueryRunner runner = createQueryRunner();
 		FetchExecutableFlows flowHandler = new FetchExecutableFlows();
 
 		try {
-			List<ExecutableFlow> properties = runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_HISTORY, flowHandler, projectId, flowId, skip, num);
+			List<ExecutableFlow> properties = runner.query(
+					FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_HISTORY, 
+					flowHandler, 
+					projectId, 
+					flowId, 
+					skip, 
+					num);
 			return properties;
 		} catch (SQLException e) {
 			throw new ExecutorManagerException("Error fetching active flows", e);
@@ -219,12 +230,21 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
 	}
 	
 	@Override
-	public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num, Status status) throws ExecutorManagerException {
+	public List<ExecutableFlow> fetchFlowHistory(
+			int projectId, String flowId, int skip, int num, Status status) 
+			throws ExecutorManagerException {
 		QueryRunner runner = createQueryRunner();
 		FetchExecutableFlows flowHandler = new FetchExecutableFlows();
 
 		try {
-			List<ExecutableFlow> properties = runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_STATUS, flowHandler, projectId, flowId, status.getNumVal(), skip, num);
+			List<ExecutableFlow> properties = runner.query(
+					FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_STATUS, 
+					flowHandler, 
+					projectId, 
+					flowId, 
+					status.getNumVal(), 
+					skip, 
+					num);
 			return properties;
 		} catch (SQLException e) {
 			throw new ExecutorManagerException("Error fetching active flows", e);
@@ -232,13 +252,18 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
 	}
 	
 	@Override
-	public List<ExecutableFlow> fetchFlowHistory(int skip, int num) throws ExecutorManagerException {
+	public List<ExecutableFlow> fetchFlowHistory(int skip, int num) 
+			throws ExecutorManagerException {
 		QueryRunner runner = createQueryRunner();
 
 		FetchExecutableFlows flowHandler = new FetchExecutableFlows();
 		
 		try {
-			List<ExecutableFlow> properties = runner.query(FetchExecutableFlows.FETCH_ALL_EXECUTABLE_FLOW_HISTORY, flowHandler, skip, num);
+			List<ExecutableFlow> properties = runner.query(
+					FetchExecutableFlows.FETCH_ALL_EXECUTABLE_FLOW_HISTORY, 
+					flowHandler, 
+					skip, 
+					num);
 			return properties;
 		} catch (SQLException e) {
 			throw new ExecutorManagerException("Error fetching active flows", e);
@@ -247,7 +272,15 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
 	
 
 	@Override
-	public List<ExecutableFlow> fetchFlowHistory(String projContain, String flowContains, String userNameContains, int status, long startTime, long endTime, int skip, int num) throws ExecutorManagerException {
+	public List<ExecutableFlow> fetchFlowHistory(
+			String projContain, 
+			String flowContains, 
+			String userNameContains, 
+			int status, 
+			long startTime, 
+			long endTime, 
+			int skip, 
+			int num) throws ExecutorManagerException {
 		String query = FetchExecutableFlows.FETCH_BASE_EXECUTABLE_FLOW_QUERY;
 		ArrayList<Object> params = new ArrayList<Object>();
 		
@@ -329,7 +362,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
 		FetchExecutableFlows flowHandler = new FetchExecutableFlows();
 
 		try {
-			List<ExecutableFlow> properties = runner.query(query, flowHandler, params.toArray());
+			List<ExecutableFlow> properties = runner.query(
+					query, flowHandler, params.toArray());
 			return properties;
 		} catch (SQLException e) {
 			throw new ExecutorManagerException("Error fetching active flows", e);
@@ -839,13 +873,28 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
 		
 	}
 	
-	private static class FetchExecutableFlows implements ResultSetHandler<List<ExecutableFlow>> {
-		private static String FETCH_BASE_EXECUTABLE_FLOW_QUERY = "SELECT exec_id, enc_type, flow_data FROM execution_flows ";
-		private static String FETCH_EXECUTABLE_FLOW = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE exec_id=?";
-		//private static String FETCH_ACTIVE_EXECUTABLE_FLOW = "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data FROM execution_flows ex INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";
-		private static String FETCH_ALL_EXECUTABLE_FLOW_HISTORY = "SELECT exec_id, enc_type, flow_data FROM execution_flows ORDER BY exec_id DESC LIMIT ?, ?";
-		private static String FETCH_EXECUTABLE_FLOW_HISTORY = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE project_id=? AND flow_id=? ORDER BY exec_id DESC LIMIT ?, ?";
-		private static String FETCH_EXECUTABLE_FLOW_BY_STATUS = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE project_id=? AND flow_id=? AND status=? ORDER BY exec_id DESC LIMIT ?, ?";
+	private static class FetchExecutableFlows 
+			implements ResultSetHandler<List<ExecutableFlow>> {
+		private static String FETCH_BASE_EXECUTABLE_FLOW_QUERY = 
+				"SELECT exec_id, enc_type, flow_data FROM execution_flows ";
+		private static String FETCH_EXECUTABLE_FLOW = 
+				"SELECT exec_id, enc_type, flow_data FROM execution_flows " +
+						"WHERE exec_id=?";
+		//private static String FETCH_ACTIVE_EXECUTABLE_FLOW = 
+		//	"SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data " +
+		//			"FROM execution_flows ex " +
+		//			"INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";
+		private static String FETCH_ALL_EXECUTABLE_FLOW_HISTORY = 
+				"SELECT exec_id, enc_type, flow_data FROM execution_flows " +
+						"ORDER BY exec_id DESC LIMIT ?, ?";
+		private static String FETCH_EXECUTABLE_FLOW_HISTORY = 
+				"SELECT exec_id, enc_type, flow_data FROM execution_flows " +
+						"WHERE project_id=? AND flow_id=? " +
+						"ORDER BY exec_id DESC LIMIT ?, ?";
+		private static String FETCH_EXECUTABLE_FLOW_BY_STATUS = 
+				"SELECT exec_id, enc_type, flow_data FROM execution_flows " +
+						"WHERE project_id=? AND flow_id=? AND status=? " +
+						"ORDER BY exec_id DESC LIMIT ?, ?";
 		
 		@Override
 		public List<ExecutableFlow> handle(ResultSet rs) throws SQLException {
@@ -863,7 +912,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
 					EncodingType encType = EncodingType.fromInteger(encodingType);
 					Object flowObj;
 					try {
-						// Convoluted way to inflate strings. Should find common package or helper function.
+						// Convoluted way to inflate strings. Should find common package 
+						// or helper function.
 						if (encType == EncodingType.GZIP) {
 							// Decompress the sucker.
 							String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
@@ -874,7 +924,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
 							flowObj = JSONUtils.parseJSONFromString(jsonString);
 						}
 						
-						ExecutableFlow exFlow = ExecutableFlow.createExecutableFlowFromObject(flowObj);
+						ExecutableFlow exFlow = 
+								ExecutableFlow.createExecutableFlowFromObject(flowObj);
 						execFlows.add(exFlow);
 					} catch (IOException e) {
 						throw new SQLException("Error retrieving flow data " + id, e);