azkaban-uncached

Details

diff --git a/src/java/azkaban/scheduler/JdbcScheduleLoader.java b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
index 809aeef..77995c9 100644
--- a/src/java/azkaban/scheduler/JdbcScheduleLoader.java
+++ b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
@@ -345,4 +345,10 @@ public class JdbcScheduleLoader extends AbstractJdbcLoader implements ScheduleLo
 		
 		return connection;
 	}
+
+	@Override
+	public List<Schedule> loadUpdatedSchedules()
+			throws ScheduleManagerException {
+		throw new ScheduleManagerException("Should never be called when using local schedule runner");
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/scheduler/ScheduleLoader.java b/src/java/azkaban/scheduler/ScheduleLoader.java
index 276b10e..71ad49d 100644
--- a/src/java/azkaban/scheduler/ScheduleLoader.java
+++ b/src/java/azkaban/scheduler/ScheduleLoader.java
@@ -29,5 +29,7 @@ public interface ScheduleLoader {
 	public void removeSchedule(Schedule s) throws ScheduleManagerException;
 
 	public void updateNextExecTime(Schedule s) throws ScheduleManagerException;
-
+	
+	// only use when using external runner
+	public List<Schedule> loadUpdatedSchedules() throws ScheduleManagerException;
 }
\ No newline at end of file
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 1db4b10..49e48c3 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -118,13 +118,37 @@ public class ScheduleManager implements TriggerAgent {
 		}
 
 		for (Schedule sched : scheduleList) {
-			internalSchedule(sched);
+			if(sched.getStatus().equals(TriggerStatus.EXPIRED.toString())) {
+				onScheduleExpire(sched);
+			} else {
+				internalSchedule(sched);
+			}
 		}
 
 		if(!useExternalRunner) {
 			this.runner.start();
 		}
 	}
+	
+	// only do this when using external runner
+	public synchronized void updateLocal() throws ScheduleManagerException {
+		if(!useExternalRunner) {
+			//throw new ScheduleManagerException("Should not change schedule states when using local runner!");
+			return;
+		}
+		List<Schedule> updates = loader.loadUpdatedSchedules();
+		for(Schedule s : updates) {
+			if(s.getStatus().equals(TriggerStatus.EXPIRED.toString())) {
+				onScheduleExpire(s);
+			} else {
+				internalSchedule(s);
+			}
+		}
+	}
+	
+	private void onScheduleExpire(Schedule s) {
+		removeSchedule(s);
+	}
 
 	/**
 	 * Shutdowns the scheduler thread. After shutdown, it may not be safe to use
@@ -142,7 +166,7 @@ public class ScheduleManager implements TriggerAgent {
 	 * @return
 	 * @throws ScheduleManagerException 
 	 */
-	public synchronized List<Schedule> getSchedules() {
+	public synchronized List<Schedule> getSchedules() throws ScheduleManagerException {
 //		if(useExternalRunner) {
 //			for(Schedule s : scheduleIDMap.values()) {
 //				try {
@@ -156,6 +180,7 @@ public class ScheduleManager implements TriggerAgent {
 //		}
 		
 		//return runner.getRunnerSchedules();
+		updateLocal();
 		return new ArrayList<Schedule>(scheduleIDMap.values());
 	}
 
@@ -164,8 +189,10 @@ public class ScheduleManager implements TriggerAgent {
 	 * 
 	 * @param id
 	 * @return
+	 * @throws ScheduleManagerException 
 	*/
-	public Set<Schedule> getSchedules(int projectId, String flowId) {
+	public Set<Schedule> getSchedules(int projectId, String flowId) throws ScheduleManagerException {
+		updateLocal();
 		return scheduleIdentityPairMap.get(new Pair<Integer,String>(projectId, flowId));
 	}
 
@@ -174,8 +201,10 @@ public class ScheduleManager implements TriggerAgent {
 	 * 
 	 * @param id
 	 * @return
+	 * @throws ScheduleManagerException 
 	*/
-	public Schedule getSchedule(int scheduleId) {
+	public Schedule getSchedule(int scheduleId) throws ScheduleManagerException {
+		updateLocal();
 		return scheduleIDMap.get(scheduleId);
 	}
 
@@ -184,8 +213,9 @@ public class ScheduleManager implements TriggerAgent {
 	 * Removes the flow from the schedule if it exists.
 	 * 
 	 * @param id
+	 * @throws ScheduleManagerException 
 	 */
-	public synchronized void removeSchedules(int projectId, String flowId) {
+	public synchronized void removeSchedules(int projectId, String flowId) throws ScheduleManagerException {
 		Set<Schedule> schedules = getSchedules(projectId, flowId);
 		if(schedules != null) {
 			for(Schedule sched : schedules) {
@@ -348,21 +378,6 @@ public class ScheduleManager implements TriggerAgent {
 		return triggerSource;
 	}
 	
-	@Override
-	public void updateLocal(Trigger t) {
-		if(t.getStatus().equals(TriggerStatus.EXPIRED)) {
-			removeSchedule(getSchedule(t.getTriggerId()));
-		} else {
-			try {
-				loader.updateNextExecTime(getSchedule(t.getTriggerId()));
-			} catch (ScheduleManagerException e) {
-				// TODO Auto-generated catch block
-				e.printStackTrace();
-				logger.error("Failed to get updated next execution time on schedule " + getSchedule(t.getTriggerId()).toString());
-			}
-		}
-	}
-
 	/**
 	 * Thread that simply invokes the running of flows when the schedule is
 	 * ready.
diff --git a/src/java/azkaban/scheduler/ScheduleStatisticManager.java b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
index 28fc11c..4c430de 100644
--- a/src/java/azkaban/scheduler/ScheduleStatisticManager.java
+++ b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
@@ -19,7 +19,7 @@ public class ScheduleStatisticManager {
 	private static File cacheDirectory = new File("cache/schedule-statistics");
 	private static final int STAT_NUMBERS = 10;
 
-	public static Map<String, Object> getStatistics(int scheduleId, AzkabanWebServer server) {
+	public static Map<String, Object> getStatistics(int scheduleId, AzkabanWebServer server) throws ScheduleManagerException {
 		Map<String, Object> data = loadCache(scheduleId);
 		if (data != null) {
 			return data;
@@ -33,7 +33,7 @@ public class ScheduleStatisticManager {
 		return data;
 	}
 
-	private static Map<String, Object> calculateStats(int scheduleId, AzkabanWebServer server) {
+	private static Map<String, Object> calculateStats(int scheduleId, AzkabanWebServer server) throws ScheduleManagerException {
 		Map<String, Object> data = new HashMap<String, Object>();
 		ExecutorManager executorManager = server.getExecutorManager();
 		ScheduleManager scheduleManager = server.getScheduleManager();
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index b7e0478..311d673 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -17,6 +17,7 @@ import azkaban.trigger.Trigger;
 import azkaban.trigger.TriggerAction;
 import azkaban.trigger.TriggerManager;
 import azkaban.trigger.TriggerManagerException;
+import azkaban.trigger.TriggerStatus;
 
 public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	
@@ -26,7 +27,8 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	
 	private String triggerSource;
 	
-	private Map<Integer, Trigger> triggersLocalCopy;
+//	private Map<Integer, Trigger> triggersLocalCopy;
+	private long lastUpdateTime = -1;
 	
 	public TriggerBasedScheduleLoader(TriggerManager triggerManager, ExecutorManager executorManager, ProjectManager projectManager, String triggerSource) {
 		this.triggerManager = triggerManager;
@@ -36,7 +38,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 		ExecuteFlowAction.setProjectManager(projectManager);
 	}
 	
-	private Trigger createScheduleTrigger(Schedule s) {
+	private Trigger scheduleToTrigger(Schedule s) {
 		
 		Condition triggerCondition = createTimeTriggerCondition(s);
 		Condition expireCondition = createTimeExpireCondition(s);
@@ -76,11 +78,11 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 
 	@Override
 	public void insertSchedule(Schedule s) throws ScheduleManagerException {
-		Trigger t = createScheduleTrigger(s);
+		Trigger t = scheduleToTrigger(s);
 		try {
 			triggerManager.insertTrigger(t);
 			s.setScheduleId(t.getTriggerId());
-			triggersLocalCopy.put(t.getTriggerId(), t);
+//			triggersLocalCopy.put(t.getTriggerId(), t);
 		} catch (TriggerManagerException e) {
 			// TODO Auto-generated catch block
 			throw new ScheduleManagerException("Failed to insert new schedule!", e);
@@ -89,10 +91,10 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 
 	@Override
 	public void updateSchedule(Schedule s) throws ScheduleManagerException {
-		Trigger t = createScheduleTrigger(s);
+		Trigger t = scheduleToTrigger(s);
 		try {
 			triggerManager.updateTrigger(t);
-			triggersLocalCopy.put(t.getTriggerId(), t);
+//			triggersLocalCopy.put(t.getTriggerId(), t);
 		} catch (TriggerManagerException e) {
 			// TODO Auto-generated catch block
 			throw new ScheduleManagerException("Failed to update schedule!", e);
@@ -103,16 +105,15 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	// may need to add logic to filter out skip runs
 	@Override
 	public synchronized List<Schedule> loadSchedules() throws ScheduleManagerException {
-		List<Trigger> triggers = triggerManager.getTriggers();
+		List<Trigger> triggers = triggerManager.getTriggers(triggerSource);
 		List<Schedule> schedules = new ArrayList<Schedule>();
-		triggersLocalCopy = new HashMap<Integer, Trigger>();
+//		triggersLocalCopy = new HashMap<Integer, Trigger>();
 		for(Trigger t : triggers) {
-			if(t.getSource().equals(triggerSource)) {
-				triggersLocalCopy.put(t.getTriggerId(), t);
-				Schedule s = triggerToSchedule(t);
-				schedules.add(s);
-				System.out.println("loaded schedule for " + s.getProjectId() + s.getProjectName());
-			}
+//				triggersLocalCopy.put(t.getTriggerId(), t);
+			lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime().getMillis());
+			Schedule s = triggerToSchedule(t);
+			schedules.add(s);
+			System.out.println("loaded schedule for " + s.getProjectId() + s.getProjectName());
 		}
 		return schedules;
 		
@@ -142,7 +143,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 					act.getProjectId(), 
 					act.getProjectName(), 
 					act.getFlowName(), 
-					"ready", 
+					t.getStatus().toString(), 
 					ck.getFirstCheckTime().getMillis(), 
 					ck.getFirstCheckTime().getZone(), 
 					ck.getPeriod(),
@@ -161,7 +162,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	public void removeSchedule(Schedule s) throws ScheduleManagerException {
 		try {
 			triggerManager.removeTrigger(s.getScheduleId());
-			triggersLocalCopy.remove(s.getScheduleId());
+//			triggersLocalCopy.remove(s.getScheduleId());
 		} catch (TriggerManagerException e) {
 			// TODO Auto-generated catch block
 			throw new ScheduleManagerException(e.getMessage());
@@ -172,9 +173,22 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	@Override
 	public void updateNextExecTime(Schedule s)
 			throws ScheduleManagerException {
-		Trigger t = triggersLocalCopy.get(s.getScheduleId());
-		BasicTimeChecker ck = (BasicTimeChecker) t.getTriggerCondition().getCheckers().values().toArray()[0];
-		s.setNextExecTime(ck.getNextCheckTime().getMillis());
+//		Trigger t = triggersLocalCopy.get(s.getScheduleId());
+//		BasicTimeChecker ck = (BasicTimeChecker) t.getTriggerCondition().getCheckers().values().toArray()[0];
+//		s.setNextExecTime(ck.getNextCheckTime().getMillis());
+	}
+
+	@Override
+	public synchronized List<Schedule> loadUpdatedSchedules() throws ScheduleManagerException {
+		List<Trigger> triggers = triggerManager.getUpdatedTriggers(triggerSource, lastUpdateTime);
+		List<Schedule> schedules = new ArrayList<Schedule>();
+		for(Trigger t : triggers) {
+			lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime().getMillis());
+			Schedule s = triggerToSchedule(t);
+			schedules.add(s);
+			System.out.println("loaded schedule for " + s.getProjectId() + s.getProjectName());
+		}
+		return schedules;
 	}
 
 }
diff --git a/src/java/azkaban/trigger/TriggerAgent.java b/src/java/azkaban/trigger/TriggerAgent.java
index f8cc4dc..f86d289 100644
--- a/src/java/azkaban/trigger/TriggerAgent.java
+++ b/src/java/azkaban/trigger/TriggerAgent.java
@@ -11,6 +11,7 @@ public interface TriggerAgent {
 	
 	void load();
 
-	public void updateLocal(Trigger t);
+//	// update local copy
+//	public void updateLocal();
 
 }
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index 1395340..11311dd 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -168,13 +168,13 @@ public class TriggerManager {
 		return checkerLoader.getSupportedCheckers();
 	}
 
-	private void updateAgent(Trigger t) {
-		TriggerAgent agent = triggerAgents.get(t.getSource());
-		if(agent != null) {
-			agent.updateLocal(t);
-		}
-		
-	}
+//	private void updateAgent(Trigger t) {
+//		TriggerAgent agent = triggerAgents.get(t.getSource());
+//		if(agent != null) {
+//			agent.updateLocal(t);
+//		}
+//		
+//	}
 	
 	//trigger scanner thread
 	public class TriggerScannerThread extends Thread {
@@ -276,7 +276,7 @@ public class TriggerManager {
 			} else {
 				t.setStatus(TriggerStatus.EXPIRED);
 			}
-			updateAgent(t);
+//			updateAgent(t);
 		}
 		
 		private void onTriggerExpire(Trigger t) throws TriggerManagerException {
@@ -287,7 +287,7 @@ public class TriggerManager {
 			} else {
 				t.setStatus(TriggerStatus.EXPIRED);
 			}
-			updateAgent(t);
+//			updateAgent(t);
 		}
 	}
 
@@ -298,7 +298,27 @@ public class TriggerManager {
 	public void expireTrigger(int triggerId) {
 		Trigger t = getTrigger(triggerId);
 		t.setStatus(TriggerStatus.EXPIRED);
-		updateAgent(t);
+//		updateAgent(t);
+	}
+
+	public List<Trigger> getTriggers(String triggerSource) {
+		List<Trigger> triggers = new ArrayList<Trigger>();
+		for(Trigger t : triggerIdMap.values()) {
+			if(t.getSource().equals(triggerSource)) {
+				triggers.add(t);
+			}
+		}
+		return triggers;
+	}
+
+	public List<Trigger> getUpdatedTriggers(String triggerSource, long lastUpdateTime) {
+		List<Trigger> triggers = new ArrayList<Trigger>();
+		for(Trigger t : triggerIdMap.values()) {
+			if(t.getSource().equals(triggerSource) && t.getLastModifyTime().getMillis() > lastUpdateTime) {
+				triggers.add(t);
+			}
+		}
+		return triggers;
 	}
 
 }
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index a542a82..edae8a6 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -39,6 +39,7 @@ import azkaban.project.Project;
 import azkaban.project.ProjectManager;
 import azkaban.scheduler.Schedule;
 import azkaban.scheduler.ScheduleManager;
+import azkaban.scheduler.ScheduleManagerException;
 import azkaban.user.Permission;
 import azkaban.user.User;
 import azkaban.user.Permission.Type;
@@ -504,11 +505,16 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		ret.put("failureEmails", flow.getFailureEmails());
 		
 		Schedule sflow = null;
-		for (Schedule sched: scheduleManager.getSchedules()) {
-			if (sched.getProjectId() == project.getId() && sched.getFlowName().equals(flowId)) {
-				sflow = sched;
-				break;
+		try {
+			for (Schedule sched: scheduleManager.getSchedules()) {
+				if (sched.getProjectId() == project.getId() && sched.getFlowName().equals(flowId)) {
+					sflow = sched;
+					break;
+				}
 			}
+		} catch (ScheduleManagerException e) {
+			// TODO Auto-generated catch block
+			throw new ServletException(e);
 		}
 		
 		if (sflow != null) {
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 0b2d413..989cca9 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -56,6 +56,7 @@ import azkaban.project.ProjectManager;
 import azkaban.project.ProjectManagerException;
 import azkaban.scheduler.Schedule;
 import azkaban.scheduler.ScheduleManager;
+import azkaban.scheduler.ScheduleManagerException;
 import azkaban.user.Permission;
 import azkaban.user.Role;
 import azkaban.user.UserManager;
@@ -360,13 +361,19 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 		
 		// Check if scheduled
 		Schedule sflow = null;
-		for (Schedule flow: scheduleManager.getSchedules()) {
+		try {
+			for (Schedule flow: scheduleManager.getSchedules()) {
 
-			if (flow.getProjectId() == project.getId()) {
-				sflow = flow;
-				break;
+				if (flow.getProjectId() == project.getId()) {
+					sflow = flow;
+					break;
+				}
 			}
+		} catch (ScheduleManagerException e) {
+			// TODO Auto-generated catch block
+			throw new ServletException(e);
 		}
+		
 		if (sflow != null) {
 			this.setErrorMessageInCookie(resp, "Cannot delete. Please unschedule " + sflow.getScheduleName() + ".");
 
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 4722257..8e71ee4 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -176,6 +176,8 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 			
 		} catch (ServletException e) {
 			ret.put("error", e.getMessage());
+		} catch (ScheduleManagerException e) {
+			ret.put("error", e.getMessage());
 		}
 		
 	}
@@ -284,6 +286,9 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 			ret.put("allJobNames", allJobs);
 		} catch (ServletException e) {
 			ret.put("error", e);
+		} catch (ScheduleManagerException e) {
+			// TODO Auto-generated catch block
+			ret.put("error", e);
 		}
 		
 	}
@@ -309,7 +314,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		
 		Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/scheduledflowpage.vm");
 		
-		List<Schedule> schedules = scheduleManager.getSchedules();
+		List<Schedule> schedules;
+		try {
+			schedules = scheduleManager.getSchedules();
+		} catch (ScheduleManagerException e) {
+			// TODO Auto-generated catch block
+			throw new ServletException(e);
+		}
 		page.add("schedules", schedules);
 //		
 //		List<SLA> slas = slaManager.getSLAs();
@@ -323,7 +334,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		
 		Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/scheduledflowcalendarpage.vm");
 		
-		List<Schedule> schedules = scheduleManager.getSchedules();
+		List<Schedule> schedules;
+		try {
+			schedules = scheduleManager.getSchedules();
+		} catch (ScheduleManagerException e) {
+			// TODO Auto-generated catch block
+			throw new ServletException(e);
+		}
 		page.add("schedules", schedules);
 //		
 //		List<SLA> slas = slaManager.getSLAs();
@@ -360,7 +377,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 
 	private void ajaxLoadFlows(HttpServletRequest req, HashMap<String, Object> ret, User user) throws ServletException {
 		
-		List<Schedule> schedules = scheduleManager.getSchedules();
+		List<Schedule> schedules;
+		try {
+			schedules = scheduleManager.getSchedules();
+		} catch (ScheduleManagerException e) {
+			// TODO Auto-generated catch block
+			throw new ServletException(e);
+		}
 		// See if anything is scheduled
 		if (schedules.size() <= 0)
 			return;
@@ -369,11 +392,16 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		ret.put("items", output);
 
 		for (Schedule schedule : schedules) {
-			writeScheduleData(output, schedule);
+			try {
+				writeScheduleData(output, schedule);
+			} catch (ScheduleManagerException e) {
+				// TODO Auto-generated catch block
+				throw new ServletException(e);
+			}
 		}
 	}
 
-	private void writeScheduleData(List<HashMap<String, Object>> output, Schedule schedule) {
+	private void writeScheduleData(List<HashMap<String, Object>> output, Schedule schedule) throws ScheduleManagerException {
 		Map<String, Object> stats = ScheduleStatisticManager.getStatistics(schedule.getScheduleId(), (AzkabanWebServer) getApplication());
 		HashMap<String, Object> data = new HashMap<String, Object>();
 		data.put("scheduleid", schedule.getScheduleId());
@@ -499,7 +527,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 
 	private void ajaxRemoveSched(HttpServletRequest req, Map<String, Object> ret, User user) throws ServletException{
 		int scheduleId = getIntParam(req, "scheduleId");
-		Schedule sched = scheduleManager.getSchedule(scheduleId);
+		Schedule sched;
+		try {
+			sched = scheduleManager.getSchedule(scheduleId);
+		} catch (ScheduleManagerException e) {
+			// TODO Auto-generated catch block
+			throw new ServletException(e);
+		}
 		if(sched == null) {
 			ret.put("message", "Schedule with ID " + scheduleId + " does not exist");
 			ret.put("status", "error");