azkaban-developers
Changes
src/java/azkaban/scheduler/ScheduleManager.java 346(+4 -342)
Details
src/java/azkaban/scheduler/ScheduleManager.java 346(+4 -342)
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index f0325af..efc431f 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -47,22 +47,8 @@ public class ScheduleManager implements TriggerAgent {
private final DateTimeFormatter _dateFormat = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss:SSS");
private ScheduleLoader loader;
-// private Map<Pair<Integer, String>, Set<Schedule>> scheduleIdentityPairMap = new LinkedHashMap<Pair<Integer, String>, Set<Schedule>>();
private Map<Integer, Schedule> scheduleIDMap = new LinkedHashMap<Integer, Schedule>();
private Map<Pair<Integer, String>, Schedule> scheduleIdentityPairMap = new LinkedHashMap<Pair<Integer, String>, Schedule>();
-
-// private final ExecutorManagerAdapter executorManager;
-//
-// private ProjectManager projectManager = null;
-//
- // Used for mbeans to query Scheduler status
-//<<<<<<< HEAD
-//
-//=======
-// private long lastCheckTime = -1;
-// private long nextWakupTime = -1;
-// private String runnerStage = "not started";
-//>>>>>>> 10830aeb8ac819473873cac3bb4e07b4aeda67e8
/**
* Give the schedule manager a loader class that will properly load the
@@ -72,15 +58,10 @@ public class ScheduleManager implements TriggerAgent {
*/
public ScheduleManager (ScheduleLoader loader)
{
-// this.executorManager = executorManager;
this.loader = loader;
}
-// public void setProjectManager(ProjectManager projectManager) {
-// this.projectManager = projectManager;
-// }
-
@Override
public void start() throws ScheduleManagerException {
List<Schedule> scheduleList = null;
@@ -133,19 +114,7 @@ public class ScheduleManager implements TriggerAgent {
* @throws ScheduleManagerException
*/
public synchronized List<Schedule> getSchedules() throws ScheduleManagerException {
-// if(useExternalRunner) {
-// for(Schedule s : scheduleIDMap.values()) {
-// try {
-// loader.updateNextExecTime(s);
-// } catch (ScheduleManagerException e) {
-// // TODO Auto-generated catch block
-// e.printStackTrace();
-// logger.error("Failed to update schedule from external runner for schedule " + s.getScheduleId());
-// }
-// }
-// }
-
- //return runner.getRunnerSchedules();
+
updateLocal();
return new ArrayList<Schedule>(scheduleIDMap.values());
}
@@ -157,10 +126,7 @@ public class ScheduleManager implements TriggerAgent {
* @return
* @throws ScheduleManagerException
*/
-// public Set<Schedule> getSchedules(int projectId, String flowId) throws ScheduleManagerException {
-// updateLocal();
-// return scheduleIdentityPairMap.get(new Pair<Integer,String>(projectId, flowId));
-// }
+
public Schedule getSchedule(int projectId, String flowId) throws ScheduleManagerException {
updateLocal();
return scheduleIdentityPairMap.get(new Pair<Integer,String>(projectId, flowId));
@@ -185,14 +151,7 @@ public class ScheduleManager implements TriggerAgent {
* @param id
* @throws ScheduleManagerException
*/
-// public synchronized void removeSchedules(int projectId, String flowId) throws ScheduleManagerException {
-// Set<Schedule> schedules = getSchedules(projectId, flowId);
-// if(schedules != null) {
-// for(Schedule sched : schedules) {
-// removeSchedule(sched);
-// }
-// }
-// }
+
public synchronized void removeSchedule(int projectId, String flowId) throws ScheduleManagerException {
Schedule sched = getSchedule(projectId, flowId);
if(sched != null) {
@@ -206,13 +165,7 @@ public class ScheduleManager implements TriggerAgent {
*/
public synchronized void removeSchedule(Schedule sched) {
Pair<Integer,String> identityPairMap = sched.getScheduleIdentityPair();
-// Set<Schedule> schedules = scheduleIdentityPairMap.get(identityPairMap);
-// if(schedules != null) {
-// schedules.remove(sched);
-// if(schedules.size() == 0) {
-// scheduleIdentityPairMap.remove(identityPairMap);
-// }
-// }
+
Schedule schedule = scheduleIdentityPairMap.get(identityPairMap);
if(schedule != null) {
scheduleIdentityPairMap.remove(identityPairMap);
@@ -230,28 +183,6 @@ public class ScheduleManager implements TriggerAgent {
}
- // public synchronized void pauseScheduledFlow(String scheduleId){
- // try{
- // ScheduledFlow flow = scheduleIDMap.get(scheduleId);
- // flow.setSchedStatus(SchedStatus.LASTPAUSED);
- // loader.saveSchedule(getSchedule());
- // }
- // catch (Exception e) {
- // throw new RuntimeException("Error pausing a schedule " + scheduleId);
- // }
- // }
- //
- // public synchronized void resumeScheduledFlow(String scheduleId){
- // try {
- // ScheduledFlow flow = scheduleIDMap.get(scheduleId);
- // flow.setSchedStatus(SchedStatus.LASTSUCCESS);
- // loader.saveSchedule(getSchedule());
- // }
- // catch (Exception e) {
- // throw new RuntimeException("Error resuming a schedule " + scheduleId);
- // }
- // }
-
public Schedule scheduleFlow(
final int scheduleId,
final int projectId,
@@ -300,19 +231,7 @@ public class ScheduleManager implements TriggerAgent {
* @param flow
*/
private synchronized void internalSchedule(Schedule s) {
- //Schedule existing = scheduleIDMap.get(s.getScheduleId());
-// Schedule existing = null;
-// if(scheduleIdentityPairMap.get(s.getScheduleIdentityPair()) != null) {
-// existing = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
-// }
-
scheduleIDMap.put(s.getScheduleId(), s);
-// Set<Schedule> schedules = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
-// if(schedules == null) {
-// schedules = new HashSet<Schedule>();
-// scheduleIdentityPairMap.put(s.getScheduleIdentityPair(), schedules);
-// }
-// schedules.add(s);
scheduleIdentityPairMap.put(s.getScheduleIdentityPair(), s);
}
@@ -351,263 +270,6 @@ public class ScheduleManager implements TriggerAgent {
throw new ScheduleManagerException("create " + getTriggerSource() + " from json not supported yet" );
}
- /**
- * Thread that simply invokes the running of flows when the schedule is
- * ready.
- *
- * @author Richard Park
- *
- */
-// public class ScheduleRunner extends Thread {
-// private final PriorityBlockingQueue<Schedule> schedules;
-// private AtomicBoolean stillAlive = new AtomicBoolean(true);
-//
-// // Five minute minimum intervals
-// private static final int TIMEOUT_MS = 300000;
-//
-// public ScheduleRunner() {
-// schedules = new PriorityBlockingQueue<Schedule>(1,new ScheduleComparator());
-// }
-//
-// public void shutdown() {
-// logger.error("Shutting down scheduler thread");
-// stillAlive.set(false);
-// this.interrupt();
-// }
-//
-// /**
-// * Return a list of scheduled flow
-// *
-// * @return
-// */
-// public synchronized List<Schedule> getRunnerSchedules() {
-// return new ArrayList<Schedule>(schedules);
-// }
-//
-// /**
-// * Adds the flow to the schedule and then interrupts so it will update
-// * its wait time.
-// *
-// * @param flow
-// */
-// public synchronized void addRunnerSchedule(Schedule s) {
-// logger.info("Adding " + s + " to schedule runner.");
-// schedules.add(s);
-// // MonitorImpl.getInternalMonitorInterface().workflowEvent(null,
-// // System.currentTimeMillis(),
-// // WorkflowAction.SCHEDULE_WORKFLOW,
-// // WorkflowState.NOP,
-// // flow.getId());
-//
-// this.interrupt();
-// }
-//
-// /**
-// * Remove scheduled flows. Does not interrupt.
-// *
-// * @param flow
-// */
-// public synchronized void removeRunnerSchedule(Schedule s) {
-// logger.info("Removing " + s + " from the schedule runner.");
-// schedules.remove(s);
-// // MonitorImpl.getInternalMonitorInterface().workflowEvent(null,
-// // System.currentTimeMillis(),
-// // WorkflowAction.UNSCHEDULE_WORKFLOW,
-// // WorkflowState.NOP,
-// // flow.getId());
-// // Don't need to interrupt, because if this is originally on the top
-// // of the queue,
-// // it'll just skip it.
-// }
-//
-// public void run() {
-// while (stillAlive.get()) {
-// synchronized (this) {
-// try {
-// lastCheckTime = System.currentTimeMillis();
-//
-// runnerStage = "Starting schedule scan.";
-// // TODO clear up the exception handling
-// Schedule s = schedules.peek();
-//
-// if (s == null) {
-// // If null, wake up every minute or so to see if
-// // there's something to do. Most likely there will not be.
-// try {
-// logger.info("Nothing scheduled to run. Checking again soon.");
-// runnerStage = "Waiting for next round scan.";
-// nextWakupTime = System.currentTimeMillis() + TIMEOUT_MS;
-// this.wait(TIMEOUT_MS);
-// } catch (InterruptedException e) {
-// // interruption should occur when items are added or removed from the queue.
-// }
-// } else {
-// // We've passed the flow execution time, so we will run.
-// if (!(new DateTime(s.getNextExecTime())).isAfterNow()) {
-// // Run flow. The invocation of flows should be quick.
-// Schedule runningSched = schedules.poll();
-//
-// runnerStage = "Ready to run schedule " + runningSched.toString();
-//
-// logger.info("Scheduler ready to run " + runningSched.toString());
-// // Execute the flow here
-// try {
-// Project project = projectManager.getProject(runningSched.getProjectId());
-// if (project == null) {
-// logger.error("Scheduled Project " + runningSched.getProjectId() + " does not exist!");
-// throw new RuntimeException("Error finding the scheduled project. "+ runningSched.getProjectId());
-// }
-// //TODO It is possible that the project is there, but the flow doesn't exist because upload a version that changes flow structure
-//
-// Flow flow = project.getFlow(runningSched.getFlowName());
-// if (flow == null) {
-// logger.error("Flow " + runningSched.getScheduleName() + " cannot be found in project " + project.getName());
-// throw new RuntimeException("Error finding the scheduled flow. " + runningSched.getScheduleName());
-// }
-//
-// // Create ExecutableFlow
-// ExecutableFlow exflow = new ExecutableFlow(flow);
-// System.out.println("ScheduleManager: creating schedule: " +runningSched.getScheduleId());
-// exflow.setScheduleId(runningSched.getScheduleId());
-// exflow.setSubmitUser(runningSched.getSubmitUser());
-// exflow.addAllProxyUsers(project.getProxyUsers());
-//
-// ExecutionOptions flowOptions = runningSched.getExecutionOptions();
-// if(flowOptions == null) {
-// flowOptions = new ExecutionOptions();
-// flowOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
-// }
-// exflow.setExecutionOptions(flowOptions);
-//
-// if (!flowOptions.isFailureEmailsOverridden()) {
-// flowOptions.setFailureEmails(flow.getFailureEmails());
-// }
-// if (!flowOptions.isSuccessEmailsOverridden()) {
-// flowOptions.setSuccessEmails(flow.getSuccessEmails());
-// }
-//
-// runnerStage = "Submitting flow " + exflow.getFlowId();
-// flowOptions.setMailCreator(flow.getMailCreator());
-//
-// try {
-// executorManager.submitExecutableFlow(exflow);
-// logger.info("Scheduler has invoked " + exflow.getExecutionId());
-// }
-// catch (ExecutorManagerException e) {
-// throw e;
-// }
-// catch (Exception e) {
-// e.printStackTrace();
-// throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
-// }
-//
-// SlaOptions slaOptions = runningSched.getSlaOptions();
-// if(slaOptions != null) {
-// logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
-// runnerStage = "Submitting SLA checkings for " + runningSched.getFlowName();
-// // submit flow slas
-// List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
-// for(SlaSetting set : slaOptions.getSettings()) {
-// if(set.getId().equals("")) {
-// DateTime checkTime = new DateTime(runningSched.getNextExecTime()).plus(set.getDuration());
-// slaManager.submitSla(exflow.getExecutionId(), "", checkTime, slaOptions.getSlaEmails(), set.getActions(), null, set.getRule());
-// }
-// else {
-// jobsettings.add(set);
-// }
-// }
-// if(jobsettings.size() > 0) {
-// slaManager.submitSla(exflow.getExecutionId(), "", DateTime.now(), slaOptions.getSlaEmails(), new ArrayList<SlaAction>(), jobsettings, SlaRule.WAITANDCHECKJOB);
-// }
-// }
-//
-// }
-// catch (ExecutorManagerException e) {
-// if (e.getReason() != null && e.getReason() == ExecutorManagerException.Reason.SkippedExecution) {
-// logger.info(e.getMessage());
-// }
-// else {
-// e.printStackTrace();
-// }
-// }
-// catch (Exception e) {
-// logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
-// }
-//
-// runnerStage = "Done running schedule for " + runningSched.toString();
-// removeRunnerSchedule(runningSched);
-//
-// // Immediately reschedule if it's possible. Let
-// // the execution manager
-// // handle any duplicate runs.
-// if (runningSched.updateTime()) {
-// addRunnerSchedule(runningSched);
-// loader.updateSchedule(runningSched);
-// }
-// else {
-// removeSchedule(runningSched);
-// }
-// } else {
-// runnerStage = "Waiting for next round scan.";
-// // wait until flow run
-// long millisWait = Math.max(0, s.getNextExecTime() - (new DateTime()).getMillis());
-// try {
-// nextWakupTime = System.currentTimeMillis() + millisWait;
-// this.wait(Math.min(millisWait, TIMEOUT_MS));
-// } catch (InterruptedException e) {
-// // interruption should occur when items are
-// // added or removed from the queue.
-// }
-// }
-// }
-// } catch (Exception e) {
-// logger.error("Unexpected exception has been thrown in scheduler", e);
-// } catch (Throwable e) {
-// logger.error("Unexpected throwable has been thrown in scheduler", e);
-// }
-// }
-// }
-// }
-//
-// /**
-// * Class to sort the schedule based on time.
-// *
-// * @author Richard Park
-// */
-// private class ScheduleComparator implements Comparator<Schedule> {
-// @Override
-// public int compare(Schedule arg0, Schedule arg1) {
-// long first = arg1.getNextExecTime();
-// long second = arg0.getNextExecTime();
-//
-// if (first == second) {
-// return 0;
-// } else if (first < second) {
-// return 1;
-// }
-//
-// return -1;
-// }
-// }
-// }
-
-// public long getLastCheckTime() {
-// return lastCheckTime;
-// }
-//
-// public long getNextUpdateTime() {
-// return nextWakupTime;
-// }
-//
-// public State getThreadState() {
-// return runner.getState();
-// }
-//
-// public boolean isThreadActive() {
-// return runner.isAlive();
-//>>>>>>> df6eb48ad044ae68afffae2254991289792f33a0
-// }
-
@Override
public String getTriggerSource() {
return triggerSource;