azkaban-uncached
Changes
src/java/azkaban/scheduler/ScheduleManager.java 55(+35 -20)
src/java/azkaban/trigger/TriggerManager.java 40(+30 -10)
Details
diff --git a/src/java/azkaban/scheduler/JdbcScheduleLoader.java b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
index 809aeef..77995c9 100644
--- a/src/java/azkaban/scheduler/JdbcScheduleLoader.java
+++ b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
@@ -345,4 +345,10 @@ public class JdbcScheduleLoader extends AbstractJdbcLoader implements ScheduleLo
return connection;
}
+
+ @Override
+ public List<Schedule> loadUpdatedSchedules()
+ throws ScheduleManagerException {
+ throw new ScheduleManagerException("Should never be called when using local schedule runner");
+ }
}
\ No newline at end of file
diff --git a/src/java/azkaban/scheduler/ScheduleLoader.java b/src/java/azkaban/scheduler/ScheduleLoader.java
index 276b10e..71ad49d 100644
--- a/src/java/azkaban/scheduler/ScheduleLoader.java
+++ b/src/java/azkaban/scheduler/ScheduleLoader.java
@@ -29,5 +29,7 @@ public interface ScheduleLoader {
public void removeSchedule(Schedule s) throws ScheduleManagerException;
public void updateNextExecTime(Schedule s) throws ScheduleManagerException;
-
+
+ // only use when using external runner
+ public List<Schedule> loadUpdatedSchedules() throws ScheduleManagerException;
}
\ No newline at end of file
src/java/azkaban/scheduler/ScheduleManager.java 55(+35 -20)
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 1db4b10..49e48c3 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -118,13 +118,37 @@ public class ScheduleManager implements TriggerAgent {
}
for (Schedule sched : scheduleList) {
- internalSchedule(sched);
+ if(sched.getStatus().equals(TriggerStatus.EXPIRED.toString())) {
+ onScheduleExpire(sched);
+ } else {
+ internalSchedule(sched);
+ }
}
if(!useExternalRunner) {
this.runner.start();
}
}
+
+ // only do this when using external runner
+ public synchronized void updateLocal() throws ScheduleManagerException {
+ if(!useExternalRunner) {
+ //throw new ScheduleManagerException("Should not change schedule states when using local runner!");
+ return;
+ }
+ List<Schedule> updates = loader.loadUpdatedSchedules();
+ for(Schedule s : updates) {
+ if(s.getStatus().equals(TriggerStatus.EXPIRED.toString())) {
+ onScheduleExpire(s);
+ } else {
+ internalSchedule(s);
+ }
+ }
+ }
+
+ private void onScheduleExpire(Schedule s) {
+ removeSchedule(s);
+ }
/**
* Shutdowns the scheduler thread. After shutdown, it may not be safe to use
@@ -142,7 +166,7 @@ public class ScheduleManager implements TriggerAgent {
* @return
* @throws ScheduleManagerException
*/
- public synchronized List<Schedule> getSchedules() {
+ public synchronized List<Schedule> getSchedules() throws ScheduleManagerException {
// if(useExternalRunner) {
// for(Schedule s : scheduleIDMap.values()) {
// try {
@@ -156,6 +180,7 @@ public class ScheduleManager implements TriggerAgent {
// }
//return runner.getRunnerSchedules();
+ updateLocal();
return new ArrayList<Schedule>(scheduleIDMap.values());
}
@@ -164,8 +189,10 @@ public class ScheduleManager implements TriggerAgent {
*
* @param id
* @return
+ * @throws ScheduleManagerException
*/
- public Set<Schedule> getSchedules(int projectId, String flowId) {
+ public Set<Schedule> getSchedules(int projectId, String flowId) throws ScheduleManagerException {
+ updateLocal();
return scheduleIdentityPairMap.get(new Pair<Integer,String>(projectId, flowId));
}
@@ -174,8 +201,10 @@ public class ScheduleManager implements TriggerAgent {
*
* @param id
* @return
+ * @throws ScheduleManagerException
*/
- public Schedule getSchedule(int scheduleId) {
+ public Schedule getSchedule(int scheduleId) throws ScheduleManagerException {
+ updateLocal();
return scheduleIDMap.get(scheduleId);
}
@@ -184,8 +213,9 @@ public class ScheduleManager implements TriggerAgent {
* Removes the flow from the schedule if it exists.
*
* @param id
+ * @throws ScheduleManagerException
*/
- public synchronized void removeSchedules(int projectId, String flowId) {
+ public synchronized void removeSchedules(int projectId, String flowId) throws ScheduleManagerException {
Set<Schedule> schedules = getSchedules(projectId, flowId);
if(schedules != null) {
for(Schedule sched : schedules) {
@@ -348,21 +378,6 @@ public class ScheduleManager implements TriggerAgent {
return triggerSource;
}
- @Override
- public void updateLocal(Trigger t) {
- if(t.getStatus().equals(TriggerStatus.EXPIRED)) {
- removeSchedule(getSchedule(t.getTriggerId()));
- } else {
- try {
- loader.updateNextExecTime(getSchedule(t.getTriggerId()));
- } catch (ScheduleManagerException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- logger.error("Failed to get updated next execution time on schedule " + getSchedule(t.getTriggerId()).toString());
- }
- }
- }
-
/**
* Thread that simply invokes the running of flows when the schedule is
* ready.
diff --git a/src/java/azkaban/scheduler/ScheduleStatisticManager.java b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
index 28fc11c..4c430de 100644
--- a/src/java/azkaban/scheduler/ScheduleStatisticManager.java
+++ b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
@@ -19,7 +19,7 @@ public class ScheduleStatisticManager {
private static File cacheDirectory = new File("cache/schedule-statistics");
private static final int STAT_NUMBERS = 10;
- public static Map<String, Object> getStatistics(int scheduleId, AzkabanWebServer server) {
+ public static Map<String, Object> getStatistics(int scheduleId, AzkabanWebServer server) throws ScheduleManagerException {
Map<String, Object> data = loadCache(scheduleId);
if (data != null) {
return data;
@@ -33,7 +33,7 @@ public class ScheduleStatisticManager {
return data;
}
- private static Map<String, Object> calculateStats(int scheduleId, AzkabanWebServer server) {
+ private static Map<String, Object> calculateStats(int scheduleId, AzkabanWebServer server) throws ScheduleManagerException {
Map<String, Object> data = new HashMap<String, Object>();
ExecutorManager executorManager = server.getExecutorManager();
ScheduleManager scheduleManager = server.getScheduleManager();
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index b7e0478..311d673 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -17,6 +17,7 @@ import azkaban.trigger.Trigger;
import azkaban.trigger.TriggerAction;
import azkaban.trigger.TriggerManager;
import azkaban.trigger.TriggerManagerException;
+import azkaban.trigger.TriggerStatus;
public class TriggerBasedScheduleLoader implements ScheduleLoader {
@@ -26,7 +27,8 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
private String triggerSource;
- private Map<Integer, Trigger> triggersLocalCopy;
+// private Map<Integer, Trigger> triggersLocalCopy;
+ private long lastUpdateTime = -1;
public TriggerBasedScheduleLoader(TriggerManager triggerManager, ExecutorManager executorManager, ProjectManager projectManager, String triggerSource) {
this.triggerManager = triggerManager;
@@ -36,7 +38,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
ExecuteFlowAction.setProjectManager(projectManager);
}
- private Trigger createScheduleTrigger(Schedule s) {
+ private Trigger scheduleToTrigger(Schedule s) {
Condition triggerCondition = createTimeTriggerCondition(s);
Condition expireCondition = createTimeExpireCondition(s);
@@ -76,11 +78,11 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
@Override
public void insertSchedule(Schedule s) throws ScheduleManagerException {
- Trigger t = createScheduleTrigger(s);
+ Trigger t = scheduleToTrigger(s);
try {
triggerManager.insertTrigger(t);
s.setScheduleId(t.getTriggerId());
- triggersLocalCopy.put(t.getTriggerId(), t);
+// triggersLocalCopy.put(t.getTriggerId(), t);
} catch (TriggerManagerException e) {
// TODO Auto-generated catch block
throw new ScheduleManagerException("Failed to insert new schedule!", e);
@@ -89,10 +91,10 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
@Override
public void updateSchedule(Schedule s) throws ScheduleManagerException {
- Trigger t = createScheduleTrigger(s);
+ Trigger t = scheduleToTrigger(s);
try {
triggerManager.updateTrigger(t);
- triggersLocalCopy.put(t.getTriggerId(), t);
+// triggersLocalCopy.put(t.getTriggerId(), t);
} catch (TriggerManagerException e) {
// TODO Auto-generated catch block
throw new ScheduleManagerException("Failed to update schedule!", e);
@@ -103,16 +105,15 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
// may need to add logic to filter out skip runs
@Override
public synchronized List<Schedule> loadSchedules() throws ScheduleManagerException {
- List<Trigger> triggers = triggerManager.getTriggers();
+ List<Trigger> triggers = triggerManager.getTriggers(triggerSource);
List<Schedule> schedules = new ArrayList<Schedule>();
- triggersLocalCopy = new HashMap<Integer, Trigger>();
+// triggersLocalCopy = new HashMap<Integer, Trigger>();
for(Trigger t : triggers) {
- if(t.getSource().equals(triggerSource)) {
- triggersLocalCopy.put(t.getTriggerId(), t);
- Schedule s = triggerToSchedule(t);
- schedules.add(s);
- System.out.println("loaded schedule for " + s.getProjectId() + s.getProjectName());
- }
+// triggersLocalCopy.put(t.getTriggerId(), t);
+ lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime().getMillis());
+ Schedule s = triggerToSchedule(t);
+ schedules.add(s);
+ System.out.println("loaded schedule for " + s.getProjectId() + s.getProjectName());
}
return schedules;
@@ -142,7 +143,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
act.getProjectId(),
act.getProjectName(),
act.getFlowName(),
- "ready",
+ t.getStatus().toString(),
ck.getFirstCheckTime().getMillis(),
ck.getFirstCheckTime().getZone(),
ck.getPeriod(),
@@ -161,7 +162,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
public void removeSchedule(Schedule s) throws ScheduleManagerException {
try {
triggerManager.removeTrigger(s.getScheduleId());
- triggersLocalCopy.remove(s.getScheduleId());
+// triggersLocalCopy.remove(s.getScheduleId());
} catch (TriggerManagerException e) {
// TODO Auto-generated catch block
throw new ScheduleManagerException(e.getMessage());
@@ -172,9 +173,22 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
@Override
public void updateNextExecTime(Schedule s)
throws ScheduleManagerException {
- Trigger t = triggersLocalCopy.get(s.getScheduleId());
- BasicTimeChecker ck = (BasicTimeChecker) t.getTriggerCondition().getCheckers().values().toArray()[0];
- s.setNextExecTime(ck.getNextCheckTime().getMillis());
+// Trigger t = triggersLocalCopy.get(s.getScheduleId());
+// BasicTimeChecker ck = (BasicTimeChecker) t.getTriggerCondition().getCheckers().values().toArray()[0];
+// s.setNextExecTime(ck.getNextCheckTime().getMillis());
+ }
+
+ @Override
+ public synchronized List<Schedule> loadUpdatedSchedules() throws ScheduleManagerException {
+ List<Trigger> triggers = triggerManager.getUpdatedTriggers(triggerSource, lastUpdateTime);
+ List<Schedule> schedules = new ArrayList<Schedule>();
+ for(Trigger t : triggers) {
+ lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime().getMillis());
+ Schedule s = triggerToSchedule(t);
+ schedules.add(s);
+ System.out.println("loaded schedule for " + s.getProjectId() + s.getProjectName());
+ }
+ return schedules;
}
}
diff --git a/src/java/azkaban/trigger/TriggerAgent.java b/src/java/azkaban/trigger/TriggerAgent.java
index f8cc4dc..f86d289 100644
--- a/src/java/azkaban/trigger/TriggerAgent.java
+++ b/src/java/azkaban/trigger/TriggerAgent.java
@@ -11,6 +11,7 @@ public interface TriggerAgent {
void load();
- public void updateLocal(Trigger t);
+// // update local copy
+// public void updateLocal();
}
src/java/azkaban/trigger/TriggerManager.java 40(+30 -10)
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index 1395340..11311dd 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -168,13 +168,13 @@ public class TriggerManager {
return checkerLoader.getSupportedCheckers();
}
- private void updateAgent(Trigger t) {
- TriggerAgent agent = triggerAgents.get(t.getSource());
- if(agent != null) {
- agent.updateLocal(t);
- }
-
- }
+// private void updateAgent(Trigger t) {
+// TriggerAgent agent = triggerAgents.get(t.getSource());
+// if(agent != null) {
+// agent.updateLocal(t);
+// }
+//
+// }
//trigger scanner thread
public class TriggerScannerThread extends Thread {
@@ -276,7 +276,7 @@ public class TriggerManager {
} else {
t.setStatus(TriggerStatus.EXPIRED);
}
- updateAgent(t);
+// updateAgent(t);
}
private void onTriggerExpire(Trigger t) throws TriggerManagerException {
@@ -287,7 +287,7 @@ public class TriggerManager {
} else {
t.setStatus(TriggerStatus.EXPIRED);
}
- updateAgent(t);
+// updateAgent(t);
}
}
@@ -298,7 +298,27 @@ public class TriggerManager {
public void expireTrigger(int triggerId) {
Trigger t = getTrigger(triggerId);
t.setStatus(TriggerStatus.EXPIRED);
- updateAgent(t);
+// updateAgent(t);
+ }
+
+ public List<Trigger> getTriggers(String triggerSource) {
+ List<Trigger> triggers = new ArrayList<Trigger>();
+ for(Trigger t : triggerIdMap.values()) {
+ if(t.getSource().equals(triggerSource)) {
+ triggers.add(t);
+ }
+ }
+ return triggers;
+ }
+
+ public List<Trigger> getUpdatedTriggers(String triggerSource, long lastUpdateTime) {
+ List<Trigger> triggers = new ArrayList<Trigger>();
+ for(Trigger t : triggerIdMap.values()) {
+ if(t.getSource().equals(triggerSource) && t.getLastModifyTime().getMillis() > lastUpdateTime) {
+ triggers.add(t);
+ }
+ }
+ return triggers;
}
}
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index a542a82..edae8a6 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -39,6 +39,7 @@ import azkaban.project.Project;
import azkaban.project.ProjectManager;
import azkaban.scheduler.Schedule;
import azkaban.scheduler.ScheduleManager;
+import azkaban.scheduler.ScheduleManagerException;
import azkaban.user.Permission;
import azkaban.user.User;
import azkaban.user.Permission.Type;
@@ -504,11 +505,16 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ret.put("failureEmails", flow.getFailureEmails());
Schedule sflow = null;
- for (Schedule sched: scheduleManager.getSchedules()) {
- if (sched.getProjectId() == project.getId() && sched.getFlowName().equals(flowId)) {
- sflow = sched;
- break;
+ try {
+ for (Schedule sched: scheduleManager.getSchedules()) {
+ if (sched.getProjectId() == project.getId() && sched.getFlowName().equals(flowId)) {
+ sflow = sched;
+ break;
+ }
}
+ } catch (ScheduleManagerException e) {
+ // TODO Auto-generated catch block
+ throw new ServletException(e);
}
if (sflow != null) {
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 0b2d413..989cca9 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -56,6 +56,7 @@ import azkaban.project.ProjectManager;
import azkaban.project.ProjectManagerException;
import azkaban.scheduler.Schedule;
import azkaban.scheduler.ScheduleManager;
+import azkaban.scheduler.ScheduleManagerException;
import azkaban.user.Permission;
import azkaban.user.Role;
import azkaban.user.UserManager;
@@ -360,13 +361,19 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
// Check if scheduled
Schedule sflow = null;
- for (Schedule flow: scheduleManager.getSchedules()) {
+ try {
+ for (Schedule flow: scheduleManager.getSchedules()) {
- if (flow.getProjectId() == project.getId()) {
- sflow = flow;
- break;
+ if (flow.getProjectId() == project.getId()) {
+ sflow = flow;
+ break;
+ }
}
+ } catch (ScheduleManagerException e) {
+ // TODO Auto-generated catch block
+ throw new ServletException(e);
}
+
if (sflow != null) {
this.setErrorMessageInCookie(resp, "Cannot delete. Please unschedule " + sflow.getScheduleName() + ".");
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 4722257..8e71ee4 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -176,6 +176,8 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
} catch (ServletException e) {
ret.put("error", e.getMessage());
+ } catch (ScheduleManagerException e) {
+ ret.put("error", e.getMessage());
}
}
@@ -284,6 +286,9 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
ret.put("allJobNames", allJobs);
} catch (ServletException e) {
ret.put("error", e);
+ } catch (ScheduleManagerException e) {
+ // TODO Auto-generated catch block
+ ret.put("error", e);
}
}
@@ -309,7 +314,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/scheduledflowpage.vm");
- List<Schedule> schedules = scheduleManager.getSchedules();
+ List<Schedule> schedules;
+ try {
+ schedules = scheduleManager.getSchedules();
+ } catch (ScheduleManagerException e) {
+ // TODO Auto-generated catch block
+ throw new ServletException(e);
+ }
page.add("schedules", schedules);
//
// List<SLA> slas = slaManager.getSLAs();
@@ -323,7 +334,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/scheduledflowcalendarpage.vm");
- List<Schedule> schedules = scheduleManager.getSchedules();
+ List<Schedule> schedules;
+ try {
+ schedules = scheduleManager.getSchedules();
+ } catch (ScheduleManagerException e) {
+ // TODO Auto-generated catch block
+ throw new ServletException(e);
+ }
page.add("schedules", schedules);
//
// List<SLA> slas = slaManager.getSLAs();
@@ -360,7 +377,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
private void ajaxLoadFlows(HttpServletRequest req, HashMap<String, Object> ret, User user) throws ServletException {
- List<Schedule> schedules = scheduleManager.getSchedules();
+ List<Schedule> schedules;
+ try {
+ schedules = scheduleManager.getSchedules();
+ } catch (ScheduleManagerException e) {
+ // TODO Auto-generated catch block
+ throw new ServletException(e);
+ }
// See if anything is scheduled
if (schedules.size() <= 0)
return;
@@ -369,11 +392,16 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
ret.put("items", output);
for (Schedule schedule : schedules) {
- writeScheduleData(output, schedule);
+ try {
+ writeScheduleData(output, schedule);
+ } catch (ScheduleManagerException e) {
+ // TODO Auto-generated catch block
+ throw new ServletException(e);
+ }
}
}
- private void writeScheduleData(List<HashMap<String, Object>> output, Schedule schedule) {
+ private void writeScheduleData(List<HashMap<String, Object>> output, Schedule schedule) throws ScheduleManagerException {
Map<String, Object> stats = ScheduleStatisticManager.getStatistics(schedule.getScheduleId(), (AzkabanWebServer) getApplication());
HashMap<String, Object> data = new HashMap<String, Object>();
data.put("scheduleid", schedule.getScheduleId());
@@ -499,7 +527,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
private void ajaxRemoveSched(HttpServletRequest req, Map<String, Object> ret, User user) throws ServletException{
int scheduleId = getIntParam(req, "scheduleId");
- Schedule sched = scheduleManager.getSchedule(scheduleId);
+ Schedule sched;
+ try {
+ sched = scheduleManager.getSchedule(scheduleId);
+ } catch (ScheduleManagerException e) {
+ // TODO Auto-generated catch block
+ throw new ServletException(e);
+ }
if(sched == null) {
ret.put("message", "Schedule with ID " + scheduleId + " does not exist");
ret.put("status", "error");