package azkaban.scheduler;
import java.io.File;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.ReadablePeriod;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.PeriodFormat;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
import azkaban.jobExecutor.utils.JobExecutionException;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
import azkaban.project.ProjectManagerException;
import azkaban.utils.Props;
/**
* The ScheduleManager stores and executes the schedule. It uses a single thread
* instead and waits until correct loading time for the flow. It will not remove
* the flow from the schedule when it is run, which can potentially allow the
* flow to and overlap each other.
*/
public class ScheduleManager {
private static Logger logger = Logger.getLogger(ScheduleManager.class);
private final DateTimeFormatter _dateFormat = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss:SSS");
private ScheduleLoader loader;
private Map<String, ScheduledFlow> scheduleIDMap = new LinkedHashMap<String, ScheduledFlow>();
private final ScheduleRunner runner;
private final ExecutorManager executorManager;
private final ProjectManager projectManager;
/**
* Give the schedule manager a loader class that will properly load the
* schedule.
*
* @param loader
*/
public ScheduleManager(ExecutorManager executorManager,
ProjectManager projectManager,
ScheduleLoader loader)
{
this.executorManager = executorManager;
this.projectManager = projectManager;
this.loader = loader;
this.runner = new ScheduleRunner();
List<ScheduledFlow> scheduleList = loader.loadSchedule();
for (ScheduledFlow flow : scheduleList) {
internalSchedule(flow);
}
this.runner.start();
}
/**
* Shutdowns the scheduler thread. After shutdown, it may not be safe to use
* it again.
*/
public void shutdown() {
this.runner.shutdown();
}
/**
* Retrieves a copy of the list of schedules.
*
* @return
*/
public synchronized List<ScheduledFlow> getSchedule() {
return runner.getSchedule();
}
/**
* Returns the scheduled flow for the flow name
*
* @param id
* @return
*/
public ScheduledFlow getSchedule(String scheduleId) {
return scheduleIDMap.get(scheduleId);
}
/**
* Removes the flow from the schedule if it exists.
*
* @param id
*/
public synchronized ScheduledFlow removeScheduledFlow(String scheduleId) {
ScheduledFlow flow = scheduleIDMap.get(scheduleId);
scheduleIDMap.remove(scheduleId);
runner.removeScheduledFlow(flow);
loader.saveSchedule(getSchedule());
return flow;
}
// public synchronized void pauseScheduledFlow(String scheduleId){
// try{
// ScheduledFlow flow = scheduleIDMap.get(scheduleId);
// flow.setSchedStatus(SchedStatus.LASTPAUSED);
// loader.saveSchedule(getSchedule());
// }
// catch (Exception e) {
// throw new RuntimeException("Error pausing a schedule " + scheduleId);
// }
// }
//
// public synchronized void resumeScheduledFlow(String scheduleId){
// try {
// ScheduledFlow flow = scheduleIDMap.get(scheduleId);
// flow.setSchedStatus(SchedStatus.LASTSUCCESS);
// loader.saveSchedule(getSchedule());
// }
// catch (Exception e) {
// throw new RuntimeException("Error resuming a schedule " + scheduleId);
// }
// }
public ScheduledFlow schedule(
final String scheduleId,
final String projectId,
final String flowId,
final String user,
final String userSubmit,
final DateTime submitTime,
final DateTime firstSchedTime,
final ReadablePeriod period) {
logger.info("Scheduling flow '" + scheduleId + "' for "
+ _dateFormat.print(firstSchedTime) + " with a period of "
+ PeriodFormat.getDefault().print(period));
ScheduledFlow scheduleFlow = new ScheduledFlow(scheduleId, projectId, flowId, user, userSubmit, submitTime, firstSchedTime, period);
schedule(scheduleFlow);
return scheduleFlow;
}
/**
* Schedule the flow
*
* @param flowId
* @param date
* @param ignoreDep
*/
public ScheduledFlow schedule(
String scheduleId,
String projectId,
String flowId,
String user,
String userSubmit,
DateTime submitTime,
DateTime firstSchedTime)
{
logger.info("Scheduling flow '" + scheduleId + "' for " + _dateFormat.print(firstSchedTime));
ScheduledFlow scheduleFlow = new ScheduledFlow(scheduleId, projectId, flowId, user, userSubmit, submitTime, firstSchedTime);
schedule(scheduleFlow);
return scheduleFlow;
}
/**
* Schedules the flow, but doesn't save the schedule afterwards.
*
* @param flow
*/
private synchronized void internalSchedule(ScheduledFlow flow) {
ScheduledFlow existing = scheduleIDMap.get(flow.getScheduleId());
flow.updateTime();
if (existing != null) {
this.runner.removeScheduledFlow(existing);
}
this.runner.addScheduledFlow(flow);
scheduleIDMap.put(flow.getScheduleId(), flow);
}
/**
* Adds a flow to the schedule.
*
* @param flow
*/
public synchronized void schedule(ScheduledFlow flow) {
internalSchedule(flow);
saveSchedule();
}
/**
* Save the schedule
*/
private void saveSchedule() {
loader.saveSchedule(getSchedule());
}
/**
* Thread that simply invokes the running of flows when the schedule is
* ready.
*
* @author Richard Park
*
*/
public class ScheduleRunner extends Thread {
private final PriorityBlockingQueue<ScheduledFlow> schedule;
private AtomicBoolean stillAlive = new AtomicBoolean(true);
// Five minute minimum intervals
private static final int TIMEOUT_MS = 300000;
public ScheduleRunner() {
schedule = new PriorityBlockingQueue<ScheduledFlow>(1,new ScheduleComparator());
}
public void shutdown() {
logger.error("Shutting down scheduler thread");
stillAlive.set(false);
this.interrupt();
}
/**
* Return a list of scheduled flow
*
* @return
*/
public synchronized List<ScheduledFlow> getSchedule() {
return new ArrayList<ScheduledFlow>(schedule);
}
/**
* Adds the flow to the schedule and then interrupts so it will update
* its wait time.
*
* @param flow
*/
public synchronized void addScheduledFlow(ScheduledFlow flow) {
logger.info("Adding " + flow + " to schedule.");
schedule.add(flow);
// MonitorImpl.getInternalMonitorInterface().workflowEvent(null,
// System.currentTimeMillis(),
// WorkflowAction.SCHEDULE_WORKFLOW,
// WorkflowState.NOP,
// flow.getId());
this.interrupt();
}
/**
* Remove scheduled flows. Does not interrupt.
*
* @param flow
*/
public synchronized void removeScheduledFlow(ScheduledFlow flow) {
logger.info("Removing " + flow + " from the schedule.");
schedule.remove(flow);
// MonitorImpl.getInternalMonitorInterface().workflowEvent(null,
// System.currentTimeMillis(),
// WorkflowAction.UNSCHEDULE_WORKFLOW,
// WorkflowState.NOP,
// flow.getId());
// Don't need to interrupt, because if this is originally on the top
// of the queue,
// it'll just skip it.
}
public void run() {
while (stillAlive.get()) {
synchronized (this) {
try {
// TODO clear up the exception handling
ScheduledFlow schedFlow = schedule.peek();
if (schedFlow == null) {
// If null, wake up every minute or so to see if
// there's something to do. Most likely there will not be.
try {
this.wait(TIMEOUT_MS);
} catch (InterruptedException e) {
// interruption should occur when items are added or removed from the queue.
}
} else {
// We've passed the flow execution time, so we will run.
if (!schedFlow.getNextExecTime().isAfterNow()) {
// Run flow. The invocation of flows should be quick.
ScheduledFlow runningFlow = schedule.poll();
logger.info("Scheduler attempting to run " + runningFlow.getScheduleId());
// Execute the flow here
try {
Project project = projectManager.getProject(runningFlow.getProjectId());
if (project == null) {
logger.error("Scheduled Project " + runningFlow.getProjectId() + " does not exist!");
throw new RuntimeException("Error finding the scheduled project. "+ runningFlow.getScheduleId());
}
Flow flow = project.getFlow(runningFlow.getFlowId());
if (flow == null) {
logger.error("Flow " + runningFlow.getFlowId() + " cannot be found in project " + project.getName());
throw new RuntimeException("Error finding the scheduled flow. " + runningFlow.getScheduleId());
}
HashMap<String, Props> sources;
try {
sources = projectManager.getAllFlowProperties(project,runningFlow.getFlowId());
} catch (ProjectManagerException e) {
logger.error(e.getMessage());
throw new RuntimeException("Error getting the flow resources. " + runningFlow.getScheduleId());
}
// Create ExecutableFlow
ExecutableFlow exflow = executorManager.createExecutableFlow(flow);
exflow.setSubmitUser(runningFlow.getUser());
// TODO make disabled in scheduled flow
// Map<String, String> paramGroup =
// this.getParamGroup(req, "disabled");
// for (Map.Entry<String, String> entry:
// paramGroup.entrySet()) {
// boolean nodeDisabled =
// Boolean.parseBoolean(entry.getValue());
// exflow.setStatus(entry.getKey(),
// nodeDisabled ? Status.DISABLED :
// Status.READY);
// }
// Create directory
try {
executorManager.setupExecutableFlow(exflow);
} catch (ExecutorManagerException e) {
try {
executorManager.cleanupAll(exflow);
} catch (ExecutorManagerException e1) {
e1.printStackTrace();
}
logger.error(e.getMessage());
return;
}
// Copy files to the source.
File executionDir = new File(exflow.getExecutionPath());
try {
projectManager.copyProjectSourceFilesToDirectory(project, executionDir);
} catch (ProjectManagerException e) {
try {
executorManager.cleanupAll(exflow);
} catch (ExecutorManagerException e1) {
e1.printStackTrace();
}
logger.error(e.getMessage());
return;
}
try {
executorManager.executeFlow(exflow);
project.info("Scheduler has invoked " + exflow.getExecutionId());
} catch (ExecutorManagerException e) {
try {
executorManager.cleanupAll(exflow);
} catch (ExecutorManagerException e1) {
e1.printStackTrace();
}
project.info("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.");
logger.error(e.getMessage());
return;
}
} catch (JobExecutionException e) {
logger.info("Could not run flow. " + e.getMessage());
}
schedule.remove(runningFlow);
// Immediately reschedule if it's possible. Let
// the execution manager
// handle any duplicate runs.
if (runningFlow.updateTime()) {
schedule.add(runningFlow);
}
saveSchedule();
} else {
// wait until flow run
long millisWait = Math.max(0, schedFlow.getNextExecTime().getMillis() - (new DateTime()).getMillis());
try {
this.wait(Math.min(millisWait, TIMEOUT_MS));
} catch (InterruptedException e) {
// interruption should occur when items are
// added or removed from the queue.
}
}
}
} catch (Exception e) {
logger.error("Unexpected exception has been thrown in scheduler", e);
} catch (Throwable e) {
logger.error("Unexpected throwable has been thrown in scheduler", e);
}
}
}
}
/**
* Class to sort the schedule based on time.
*
* @author Richard Park
*/
private class ScheduleComparator implements Comparator<ScheduledFlow> {
@Override
public int compare(ScheduledFlow arg0, ScheduledFlow arg1) {
DateTime first = arg1.getNextExecTime();
DateTime second = arg0.getNextExecTime();
if (first.isEqual(second)) {
return 0;
} else if (first.isBefore(second)) {
return 1;
}
return -1;
}
}
}
}