azkaban-memoizeit

Added schedule estimation

5/23/2013 5:30:00 PM

Details

diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
index c0108e2..97581e9 100644
--- a/src/java/azkaban/executor/ExecutorLoader.java
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -35,6 +35,8 @@ public interface ExecutorLoader {
 
 	public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num) throws ExecutorManagerException;
 
+	public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num, Status status) throws ExecutorManagerException;
+
 	public List<ExecutableFlow> fetchFlowHistory(String projContain, String flowContains, String userNameContains, int status, long startData, long endData, int skip, int num) throws ExecutorManagerException;
 
 	public void addActiveExecutableReference(ExecutionReference ref) throws ExecutorManagerException;
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 07b81a8..527956d 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -40,6 +40,7 @@ import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
 import azkaban.project.Project;
+import azkaban.scheduler.ScheduleStatisticManager;
 import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.JSONUtils;
@@ -661,6 +662,9 @@ public class ExecutorManager {
 						evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
 						// Add new finished
 						for (ExecutableFlow flow: finishedFlows) {
+							if(flow.getScheduleId() >= 0 && flow.getStatus() == Status.SUCCEEDED){
+								ScheduleStatisticManager.invalidateCache(flow.getScheduleId());
+							}
 							recentlyFinished.put(flow.getExecutionId(), flow);
 						}
 						
@@ -942,7 +946,11 @@ public class ExecutorManager {
 		outputList.addAll(flows);
 		return executorLoader.fetchNumExecutableFlows(projectId, flowId);
 	}
-	
+
+	public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId, int from, int length, Status status) throws ExecutorManagerException {
+		return executorLoader.fetchFlowHistory(projectId, flowId, from, length, status);
+	}
+
 	/* 
 	 * cleaner thread to clean up execution_logs, etc in DB. Runs every day.
 	 * 
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index a50b4e1..8bfbf3c 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -203,6 +203,19 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
 	}
 	
 	@Override
+	public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num, Status status) throws ExecutorManagerException {
+		QueryRunner runner = createQueryRunner();
+		FetchExecutableFlows flowHandler = new FetchExecutableFlows();
+
+		try {
+			List<ExecutableFlow> properties = runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_STATUS, flowHandler, projectId, flowId, status.getNumVal(), skip, num);
+			return properties;
+		} catch (SQLException e) {
+			throw new ExecutorManagerException("Error fetching active flows", e);
+		}
+	}
+	
+	@Override
 	public List<ExecutableFlow> fetchFlowHistory(int skip, int num) throws ExecutorManagerException {
 		QueryRunner runner = createQueryRunner();
 
@@ -816,13 +829,14 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
 		//private static String FETCH_ACTIVE_EXECUTABLE_FLOW = "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data FROM execution_flows ex INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";
 		private static String FETCH_ALL_EXECUTABLE_FLOW_HISTORY = "SELECT exec_id, enc_type, flow_data FROM execution_flows ORDER BY exec_id DESC LIMIT ?, ?";
 		private static String FETCH_EXECUTABLE_FLOW_HISTORY = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE project_id=? AND flow_id=? ORDER BY exec_id DESC LIMIT ?, ?";
+		private static String FETCH_EXECUTABLE_FLOW_BY_STATUS = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE project_id=? AND flow_id=? AND status=? ORDER BY exec_id DESC LIMIT ?, ?";
 		
 		@Override
 		public List<ExecutableFlow> handle(ResultSet rs) throws SQLException {
 			if (!rs.next()) {
 				return Collections.<ExecutableFlow>emptyList();
 			}
-
+			
 			List<ExecutableFlow> execFlows = new ArrayList<ExecutableFlow>();
 			do {
 				int id = rs.getInt(1);
diff --git a/src/java/azkaban/scheduler/ScheduleStatisticManager.java b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
new file mode 100644
index 0000000..28fc11c
--- /dev/null
+++ b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
@@ -0,0 +1,150 @@
+package azkaban.scheduler;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
+import azkaban.utils.JSONUtils;
+import azkaban.webapp.AzkabanWebServer;
+
+public class ScheduleStatisticManager {
+	private static HashMap<Integer, Object> cacheLock = new HashMap<Integer, Object>();
+	private static File cacheDirectory = new File("cache/schedule-statistics");
+	private static final int STAT_NUMBERS = 10;
+
+	public static Map<String, Object> getStatistics(int scheduleId, AzkabanWebServer server) {
+		Map<String, Object> data = loadCache(scheduleId);
+		if (data != null) {
+			return data;
+		}
+
+		// Calculate data and cache it
+		data = calculateStats(scheduleId, server);
+
+		saveCache(scheduleId, data);
+
+		return data;
+	}
+
+	private static Map<String, Object> calculateStats(int scheduleId, AzkabanWebServer server) {
+		Map<String, Object> data = new HashMap<String, Object>();
+		ExecutorManager executorManager = server.getExecutorManager();
+		ScheduleManager scheduleManager = server.getScheduleManager();
+		Schedule schedule = scheduleManager.getSchedule(scheduleId);
+
+		try {
+			List<ExecutableFlow> executables = executorManager.getExecutableFlows(schedule.getProjectId(), schedule.getFlowName(), 0, STAT_NUMBERS, Status.SUCCEEDED);
+
+			long average = 0;
+			long min = Integer.MAX_VALUE;
+			long max = 0;
+			if (executables.isEmpty()) {
+				average = 0;
+				min = 0;
+				max = 0;
+			}
+			else {
+				for (ExecutableFlow flow : executables) {
+					long time = flow.getEndTime() - flow.getStartTime();
+					average += time;
+					if (time < min) {
+						min = time;
+					}
+					if (time > max) {
+						max = time;
+					}
+				}
+				average /= executables.size();
+			}
+
+			data.put("average", average);
+			data.put("min", min);
+			data.put("max", max);
+		} catch (ExecutorManagerException e) {
+			e.printStackTrace();
+		}
+
+		return data;
+	}
+
+	public static void invalidateCache(int scheduleId) {
+		// This should be silent and not fail
+		try {
+			Object lock = getLock(scheduleId);
+			synchronized (lock) {
+				getCacheFile(scheduleId).delete();
+			}
+			unLock(scheduleId);
+		} catch (Exception e) {
+		}
+	}
+
+	private static void saveCache(int scheduleId, Map<String, Object> data) {
+		Object lock = getLock(scheduleId);
+		try {
+			synchronized (lock) {
+				File cache = getCacheFile(scheduleId);
+				cache.createNewFile();
+				OutputStream output = new FileOutputStream(cache);
+				JSONUtils.toJSON(data, output, false);
+				output.close();
+			}
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+		unLock(scheduleId);
+	}
+
+	private static Map<String, Object> loadCache(int scheduleId) {
+		Object lock = getLock(scheduleId);
+		try {
+			synchronized (lock) {
+				File cache = getCacheFile(scheduleId);
+				if (cache.exists() && cache.isFile()) {
+					Object dataObj = JSONUtils.parseJSONFromFile(cache);
+					if (dataObj instanceof Map<?, ?>) {
+						@SuppressWarnings("unchecked")
+						Map<String, Object> data = (Map<String, Object>) dataObj;
+						return data;
+					}
+				}
+			}
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+		unLock(scheduleId);
+		return null;
+	}
+
+	private static File getCacheFile(int scheduleId) {
+		cacheDirectory.mkdirs();
+		File file = new File(cacheDirectory, scheduleId + ".cache");
+		return file;
+	}
+
+	private static Object getLock(int scheduleId) {
+		Object lock = null;
+		synchronized (cacheLock) {
+			lock = cacheLock.get(scheduleId);
+			if (lock == null) {
+				lock = new Object();
+				cacheLock.put(scheduleId, lock);
+			}
+		}
+
+		return lock;
+	}
+
+	private static void unLock(int scheduleId) {
+		synchronized (cacheLock) {
+			cacheLock.remove(scheduleId);
+		}
+	}
+}
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 0092999..312b7da 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -53,6 +53,7 @@ import azkaban.project.ProjectManager;
 import azkaban.scheduler.Schedule;
 import azkaban.scheduler.ScheduleManager;
 import azkaban.scheduler.ScheduleManagerException;
+import azkaban.scheduler.ScheduleStatisticManager;
 import azkaban.sla.SLA;
 import azkaban.sla.SLA.SlaAction;
 import azkaban.sla.SLA.SlaRule;
@@ -373,6 +374,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 	}
 
 	private void writeScheduleData(List<HashMap<String, Object>> output, Schedule schedule) {
+		Map<String, Object> stats = ScheduleStatisticManager.getStatistics(schedule.getScheduleId(), (AzkabanWebServer) getApplication());
 		HashMap<String, Object> data = new HashMap<String, Object>();
 		data.put("scheduleid", schedule.getScheduleId());
 		data.put("flowname", schedule.getFlowName());
@@ -381,13 +383,20 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 
 		DateTime time = DateTime.now();
 		long period = 0;
-		if(schedule.getPeriod() != null){
+		if (schedule.getPeriod() != null) {
 			period = time.plus(schedule.getPeriod()).getMillis() - time.getMillis();
 		}
 		data.put("period", period);
-		data.put("length", 2 * 3600 * 1000);
+		int length = 3600 * 1000;
+		if (stats.get("average") != null && stats.get("average") instanceof Integer) {
+			length = (int) (Integer) stats.get("average");
+			if (length == 0) {
+				length = 3600 * 1000;
+			}
+		}
+		data.put("length", length);
 		data.put("history", false);
-
+		data.put("stats", stats);
 		output.add(data);
 	}
 	
@@ -407,7 +416,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		int loadAll = getIntParam(req, "loadAll");
 
 		// Cache file
-		File cache = new File("cache/" + startTime + ".cache");
+		File cache = new File("cache/schedule-history/" + startTime + ".cache");
 		cache.getParentFile().mkdirs();
 
 		if (useCache) {
@@ -453,7 +462,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		}
 		
 		//Create cache file
-		File cacheTemp = new File("cache/" + startTime + ".tmp");
+		File cacheTemp = new File("cache/schedule-history/" + startTime + ".tmp");
 		cacheTemp.createNewFile();
 		OutputStream cacheOutput = new FileOutputStream(cacheTemp);
 
diff --git a/src/web/js/azkaban.schedule.svg.js b/src/web/js/azkaban.schedule.svg.js
index 5a840f9..e9e583c 100644
--- a/src/web/js/azkaban.schedule.svg.js
+++ b/src/web/js/azkaban.schedule.svg.js
@@ -432,10 +432,12 @@ $(function() {
 					{
 						var items = data.items;
 
+						console.log(data);
+
 						//Sort items by day
 						for(var i = 0; i < items.length; i++)
 						{
-							items[i].length = hourMillisConst; //TODO: Remove this to get the actual length
+							//items[i].length = hourMillisConst; //TODO: Remove this to get the actual length
 							items[i].objs = new Array();
 							items[i].color = "#69F";
 							processItem(items[i], true);
@@ -448,7 +450,7 @@ $(function() {
 					$.ajax({
 						type: "GET",
 						url: requestURL,
-						data: {"ajax": "loadHistory", "startTime": firstDay + deltaDay * dayMillisConst, "loadAll" : 1},
+						data: {"ajax": "loadHistory", "startTime": firstDay + deltaDay * dayMillisConst, "loadAll" : 0},
 						//dataType: "json",
 						success: function (data)
 						{
diff --git a/unit/java/azkaban/test/execapp/MockExecutorLoader.java b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
index 6510ae3..2b81f82 100644
--- a/unit/java/azkaban/test/execapp/MockExecutorLoader.java
+++ b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
@@ -11,6 +11,7 @@ import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutionReference;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
@@ -193,5 +194,11 @@ public class MockExecutorLoader implements ExecutorLoader {
 		return 0;
 	}
 
+	@Override
+	public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num, Status status) throws ExecutorManagerException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
 
 }
\ No newline at end of file