ScheduleManager.java

430 lines | 12.563 kB Blame History Raw Download
package azkaban.scheduler;

import java.io.File;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.ReadablePeriod;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.PeriodFormat;

import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerException;

import azkaban.flow.Flow;
import azkaban.jobExecutor.utils.JobExecutionException;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
import azkaban.project.ProjectManagerException;

import azkaban.utils.Props;

/**
 * The ScheduleManager stores and executes the schedule. It uses a single thread
 * instead and waits until correct loading time for the flow. It will not remove
 * the flow from the schedule when it is run, which can potentially allow the
 * flow to and overlap each other.
 */
public class ScheduleManager {
	private static Logger logger = Logger.getLogger(ScheduleManager.class);

	private final DateTimeFormatter _dateFormat = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss:SSS");
	private ScheduleLoader loader;
	private Map<String, ScheduledFlow> scheduleIDMap = new LinkedHashMap<String, ScheduledFlow>();
	private final ScheduleRunner runner;
	private final ExecutorManager executorManager;
	private final ProjectManager projectManager;

	/**
	 * Give the schedule manager a loader class that will properly load the
	 * schedule.
	 * 
	 * @param loader
	 */
	public ScheduleManager(ExecutorManager executorManager,
							ProjectManager projectManager, 
							ScheduleLoader loader) 
	{
		this.executorManager = executorManager;
		this.projectManager = projectManager;
		this.loader = loader;
		this.runner = new ScheduleRunner();

		List<ScheduledFlow> scheduleList = loader.loadSchedule();
		for (ScheduledFlow flow : scheduleList) {
			internalSchedule(flow);
		}

		this.runner.start();
	}

	/**
	 * Shutdowns the scheduler thread. After shutdown, it may not be safe to use
	 * it again.
	 */
	public void shutdown() {
		this.runner.shutdown();
	}

	/**
	 * Retrieves a copy of the list of schedules.
	 * 
	 * @return
	 */
	public synchronized List<ScheduledFlow> getSchedule() {
		return runner.getSchedule();
	}

	/**
	 * Returns the scheduled flow for the flow name
	 * 
	 * @param id
	 * @return
	 */
	public ScheduledFlow getSchedule(String scheduleId) {
		return scheduleIDMap.get(scheduleId);
	}

	/**
	 * Removes the flow from the schedule if it exists.
	 * 
	 * @param id
	 */
	public synchronized ScheduledFlow removeScheduledFlow(String scheduleId) {
		ScheduledFlow flow = scheduleIDMap.get(scheduleId);
		scheduleIDMap.remove(scheduleId);
		runner.removeScheduledFlow(flow);

		loader.saveSchedule(getSchedule());
		
		return flow;
	}

	// public synchronized void pauseScheduledFlow(String scheduleId){
	// try{
	// ScheduledFlow flow = scheduleIDMap.get(scheduleId);
	// flow.setSchedStatus(SchedStatus.LASTPAUSED);
	// loader.saveSchedule(getSchedule());
	// }
	// catch (Exception e) {
	// throw new RuntimeException("Error pausing a schedule " + scheduleId);
	// }
	// }
	//
	// public synchronized void resumeScheduledFlow(String scheduleId){
	// try {
	// ScheduledFlow flow = scheduleIDMap.get(scheduleId);
	// flow.setSchedStatus(SchedStatus.LASTSUCCESS);
	// loader.saveSchedule(getSchedule());
	// }
	// catch (Exception e) {
	// throw new RuntimeException("Error resuming a schedule " + scheduleId);
	// }
	// }

	public ScheduledFlow schedule(
			final String scheduleId, 
			final String projectId,
			final String flowId, 
			final String user, 
			final String userSubmit,
			final DateTime submitTime, 
			final DateTime firstSchedTime,
			final ReadablePeriod period) {
		logger.info("Scheduling flow '" + scheduleId + "' for "
				+ _dateFormat.print(firstSchedTime) + " with a period of "
				+ PeriodFormat.getDefault().print(period));
		
		ScheduledFlow scheduleFlow = new ScheduledFlow(scheduleId, projectId, flowId, user, userSubmit, submitTime, firstSchedTime, period);
		schedule(scheduleFlow);
		return scheduleFlow;
	}

	/**
	 * Schedule the flow
	 * 
	 * @param flowId
	 * @param date
	 * @param ignoreDep
	 */
	public ScheduledFlow schedule(
			String scheduleId,
			String projectId,
			String flowId,
			String user, 
			String userSubmit,
			DateTime submitTime,
			DateTime firstSchedTime) 
	{
		logger.info("Scheduling flow '" + scheduleId + "' for " + _dateFormat.print(firstSchedTime));
		ScheduledFlow scheduleFlow = new ScheduledFlow(scheduleId, projectId, flowId, user, userSubmit, submitTime, firstSchedTime);
		schedule(scheduleFlow);
		return scheduleFlow;
	}

	/**
	 * Schedules the flow, but doesn't save the schedule afterwards.
	 * 
	 * @param flow
	 */
	private synchronized void internalSchedule(ScheduledFlow flow) {
		ScheduledFlow existing = scheduleIDMap.get(flow.getScheduleId());
		flow.updateTime();
		if (existing != null) {
			this.runner.removeScheduledFlow(existing);
		}

		this.runner.addScheduledFlow(flow);
		scheduleIDMap.put(flow.getScheduleId(), flow);
	}

	/**
	 * Adds a flow to the schedule.
	 * 
	 * @param flow
	 */
	public synchronized void schedule(ScheduledFlow flow) {
		internalSchedule(flow);
		saveSchedule();
	}

	/**
	 * Save the schedule
	 */
	private void saveSchedule() {
		loader.saveSchedule(getSchedule());
	}

	/**
	 * Thread that simply invokes the running of flows when the schedule is
	 * ready.
	 * 
	 * @author Richard Park
	 * 
	 */
	public class ScheduleRunner extends Thread {
		private final PriorityBlockingQueue<ScheduledFlow> schedule;
		private AtomicBoolean stillAlive = new AtomicBoolean(true);

		// Five minute minimum intervals
		private static final int TIMEOUT_MS = 300000;

		public ScheduleRunner() {
			schedule = new PriorityBlockingQueue<ScheduledFlow>(1,new ScheduleComparator());
		}

		public void shutdown() {
			logger.error("Shutting down scheduler thread");
			stillAlive.set(false);
			this.interrupt();
		}

		/**
		 * Return a list of scheduled flow
		 * 
		 * @return
		 */
		public synchronized List<ScheduledFlow> getSchedule() {
			return new ArrayList<ScheduledFlow>(schedule);
		}

		/**
		 * Adds the flow to the schedule and then interrupts so it will update
		 * its wait time.
		 * 
		 * @param flow
		 */
		public synchronized void addScheduledFlow(ScheduledFlow flow) {
			logger.info("Adding " + flow + " to schedule.");
			schedule.add(flow);
			// MonitorImpl.getInternalMonitorInterface().workflowEvent(null,
			// System.currentTimeMillis(),
			// WorkflowAction.SCHEDULE_WORKFLOW,
			// WorkflowState.NOP,
			// flow.getId());

			this.interrupt();
		}

		/**
		 * Remove scheduled flows. Does not interrupt.
		 * 
		 * @param flow
		 */
		public synchronized void removeScheduledFlow(ScheduledFlow flow) {
			logger.info("Removing " + flow + " from the schedule.");
			schedule.remove(flow);
			// MonitorImpl.getInternalMonitorInterface().workflowEvent(null,
			// System.currentTimeMillis(),
			// WorkflowAction.UNSCHEDULE_WORKFLOW,
			// WorkflowState.NOP,
			// flow.getId());
			// Don't need to interrupt, because if this is originally on the top
			// of the queue,
			// it'll just skip it.
		}

		public void run() {
			while (stillAlive.get()) {
				synchronized (this) {
					try {
						// TODO clear up the exception handling
						ScheduledFlow schedFlow = schedule.peek();

						if (schedFlow == null) {
							// If null, wake up every minute or so to see if
							// there's something to do. Most likely there will not be.
							try {
								this.wait(TIMEOUT_MS);
							} catch (InterruptedException e) {
								// interruption should occur when items are added or removed from the queue.
							}
						} else {
							// We've passed the flow execution time, so we will run.
							if (!schedFlow.getNextExecTime().isAfterNow()) {
								// Run flow. The invocation of flows should be quick.
								ScheduledFlow runningFlow = schedule.poll();
								logger.info("Scheduler attempting to run " + runningFlow.getScheduleId());

								// Execute the flow here
								try {
									Project project = projectManager.getProject(runningFlow.getProjectId());
									if (project == null) {
										logger.error("Scheduled Project " + runningFlow.getProjectId() + " does not exist!");
										throw new RuntimeException("Error finding the scheduled project. "+ runningFlow.getScheduleId());
									}

									Flow flow = project.getFlow(runningFlow.getFlowId());
									if (flow == null) {
										logger.error("Flow " + runningFlow.getFlowId() + " cannot be found in project " + project.getName());
										throw new RuntimeException("Error finding the scheduled flow. " + runningFlow.getScheduleId());
									}

									HashMap<String, Props> sources;
									try {
										sources = projectManager.getAllFlowProperties(project,runningFlow.getFlowId());
									} catch (ProjectManagerException e) {
										logger.error(e.getMessage());
										throw new RuntimeException("Error getting the flow resources. " + runningFlow.getScheduleId());
									}

									// Create ExecutableFlow
									ExecutableFlow exflow = executorManager.createExecutableFlow(flow);
									exflow.setSubmitUser(runningFlow.getUser());
									// TODO make disabled in scheduled flow
									// Map<String, String> paramGroup =
									// this.getParamGroup(req, "disabled");
									// for (Map.Entry<String, String> entry:
									// paramGroup.entrySet()) {
									// boolean nodeDisabled =
									// Boolean.parseBoolean(entry.getValue());
									// exflow.setStatus(entry.getKey(),
									// nodeDisabled ? Status.DISABLED :
									// Status.READY);
									// }

									// Create directory
									try {
										executorManager.setupExecutableFlow(exflow);
									} catch (ExecutorManagerException e) {
										try {
											executorManager.cleanupAll(exflow);
										} catch (ExecutorManagerException e1) {
											e1.printStackTrace();
										}
										logger.error(e.getMessage());
										return;
									}

									// Copy files to the source.
									File executionDir = new File(exflow.getExecutionPath());
									try {
										projectManager.copyProjectSourceFilesToDirectory(project, executionDir);
									} catch (ProjectManagerException e) {
										try {
											executorManager.cleanupAll(exflow);
										} catch (ExecutorManagerException e1) {
											e1.printStackTrace();
										}
										logger.error(e.getMessage());
										return;
									}

									try {
										executorManager.executeFlow(exflow);
										project.info("Scheduler has invoked " + exflow.getExecutionId());
									} catch (ExecutorManagerException e) {
										try {
											executorManager.cleanupAll(exflow);
										} catch (ExecutorManagerException e1) {
											e1.printStackTrace();
										}
										
										project.info("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.");
										logger.error(e.getMessage());
										return;
									}
								} catch (JobExecutionException e) {
									logger.info("Could not run flow. " + e.getMessage());
								}
								schedule.remove(runningFlow);

								// Immediately reschedule if it's possible. Let
								// the execution manager
								// handle any duplicate runs.
								if (runningFlow.updateTime()) {
									schedule.add(runningFlow);
								}
								saveSchedule();
							} else {
								// wait until flow run
								long millisWait = Math.max(0, schedFlow.getNextExecTime().getMillis() - (new DateTime()).getMillis());
								try {
									this.wait(Math.min(millisWait, TIMEOUT_MS));
								} catch (InterruptedException e) {
									// interruption should occur when items are
									// added or removed from the queue.
								}
							}
						}
					} catch (Exception e) {
						logger.error("Unexpected exception has been thrown in scheduler", e);
					} catch (Throwable e) {
						logger.error("Unexpected throwable has been thrown in scheduler", e);
					}
				}
			}
		}

		/**
		 * Class to sort the schedule based on time.
		 * 
		 * @author Richard Park
		 */
		private class ScheduleComparator implements Comparator<ScheduledFlow> {
			@Override
			public int compare(ScheduledFlow arg0, ScheduledFlow arg1) {
				DateTime first = arg1.getNextExecTime();
				DateTime second = arg0.getNextExecTime();

				if (first.isEqual(second)) {
					return 0;
				} else if (first.isBefore(second)) {
					return 1;
				}

				return -1;
			}
		}
	}
}