ScheduleManager.java

410 lines | 13.904 kB Blame History Raw Download
package azkaban.scheduler;

import java.io.File;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.ReadablePeriod;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.PeriodFormat;

import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.flow.Flow;
import azkaban.jobExecutor.utils.JobExecutionException;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
import azkaban.project.ProjectManagerException;
import azkaban.scheduler.ScheduledFlow.SchedStatus;
import azkaban.user.Permission.Type;
import azkaban.user.User;
import azkaban.utils.Props;



/**
 * The ScheduleManager stores and executes the schedule. It uses a single thread instead
 * and waits until correct loading time for the flow. It will not remove the flow from the
 * schedule when it is run, which can potentially allow the flow to and overlap each other.
 * 
 * @author Richard
 */
public class ScheduleManager {
	private static Logger logger = Logger.getLogger(ScheduleManager.class);

    private final DateTimeFormatter _dateFormat = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss:SSS");
	private ScheduleLoader loader;
    private Map<String, ScheduledFlow> scheduleIDMap = new LinkedHashMap<String, ScheduledFlow>(); 
    private final ScheduleRunner runner;
    private final ExecutorManager executorManager;
    private final ProjectManager projectManager;
    
    /**
     * Give the schedule manager a loader class that will properly load the schedule.
     * 
     * @param loader
     */
    public ScheduleManager(
    		ExecutorManager executorManager,
    		ProjectManager projectManager,
    		ScheduleLoader loader) 
    {
    	this.executorManager = executorManager;
    	this.projectManager = projectManager;
    	this.loader = loader;
    	this.runner = new ScheduleRunner();
    	
    	List<ScheduledFlow> scheduleList = loader.loadSchedule();
    	for (ScheduledFlow flow: scheduleList) {
    		internalSchedule(flow);
    	}
    	
    	this.runner.start();
    }
    
    /**
     * Shutdowns the scheduler thread. After shutdown, it may not be safe to use it again.
     */
    public void shutdown() {
    	this.runner.shutdown();
    }
    
    /**
     * Retrieves a copy of the list of schedules.
     * 
     * @return
     */
    public synchronized List<ScheduledFlow> getSchedule() {
    	return runner.getSchedule();
    }

    /**
     * Returns the scheduled flow for the flow name
     * 
     * @param id
     * @return
     */
    public ScheduledFlow getSchedule(String scheduleId) {
    	return scheduleIDMap.get(scheduleId);
    }
    
    /**
     * Removes the flow from the schedule if it exists.
     * 
     * @param id
     */
    public synchronized void removeScheduledFlow(String scheduleId) {
    	ScheduledFlow flow = scheduleIDMap.get(scheduleId);
    	scheduleIDMap.remove(scheduleId);
    	runner.removeScheduledFlow(flow);
    	
    	loader.saveSchedule(getSchedule());
    }
    
//    public synchronized void pauseScheduledFlow(String scheduleId){
//    	try{
//        	ScheduledFlow flow = scheduleIDMap.get(scheduleId);
//        	flow.setSchedStatus(SchedStatus.LASTPAUSED);
//        	loader.saveSchedule(getSchedule());
//    	}
//    	catch (Exception e) {
//    		throw new RuntimeException("Error pausing a schedule " + scheduleId);
//		}
//    }
//    
//    public synchronized void resumeScheduledFlow(String scheduleId){
//    	try {
//    		ScheduledFlow flow = scheduleIDMap.get(scheduleId);
//        	flow.setSchedStatus(SchedStatus.LASTSUCCESS);
//        	loader.saveSchedule(getSchedule());			
//		} 
//    	catch (Exception e) {
//			throw new RuntimeException("Error resuming a schedule " + scheduleId);
//		}
//    }
    
    public void schedule(final String scheduleId,
    		final String user,
    		final String userSubmit,
    		final DateTime submitTime,
    		final DateTime firstSchedTime,
    		final ReadablePeriod period
            ) {
    	//TODO: should validate projectId and flowId?
		logger.info("Scheduling flow '" + scheduleId + "' for " + _dateFormat.print(firstSchedTime)
		+ " with a period of " + PeriodFormat.getDefault().print(period));
		schedule(new ScheduledFlow(scheduleId, user, userSubmit, submitTime, firstSchedTime, period));
	}
    
    /**
     * Schedule the flow
     * @param flowId
     * @param date
     * @param ignoreDep
     */
    public void schedule(String scheduleId, String user, String userSubmit, DateTime submitTime, DateTime firstSchedTime) {
        logger.info("Scheduling flow '" + scheduleId + "' for " + _dateFormat.print(firstSchedTime));
        schedule(new ScheduledFlow(scheduleId, user, userSubmit, submitTime, firstSchedTime));
    }
    
    /**
     * Schedules the flow, but doesn't save the schedule afterwards.
     * @param flow
     */
    private synchronized void internalSchedule(ScheduledFlow flow) {
    	ScheduledFlow existing = scheduleIDMap.get(flow.getScheduleId());
    	flow.updateTime();
    	if (existing != null) {
    		this.runner.removeScheduledFlow(existing);
    	}
    	
		this.runner.addScheduledFlow(flow);
    	scheduleIDMap.put(flow.getScheduleId(), flow);
    }
    
    /**
     * Adds a flow to the schedule.
     * 
     * @param flow
     */
    public synchronized void schedule(ScheduledFlow flow) {
    	internalSchedule(flow);
    	saveSchedule();
    }
    
    /**
     * Save the schedule
     */
    private void saveSchedule() {
    	loader.saveSchedule(getSchedule());
    }
    
    
    /**
     * Thread that simply invokes the running of flows when the schedule is
     * ready.
     * 
     * @author Richard Park
     *
     */
    public class ScheduleRunner extends Thread {
    	private final PriorityBlockingQueue<ScheduledFlow> schedule;
    	private AtomicBoolean stillAlive = new AtomicBoolean(true);

        	// Five minute minimum intervals
    	private static final int TIMEOUT_MS = 300000;
    	
    	public ScheduleRunner() {
    		schedule = new PriorityBlockingQueue<ScheduledFlow>(1, new ScheduleComparator());
    	}
    	
    	public void shutdown() {
    		logger.error("Shutting down scheduler thread");
    		stillAlive.set(false);
    		this.interrupt();
    	}
    	
    	/**
    	 * Return a list of scheduled flow
    	 * @return
    	 */
    	public synchronized List<ScheduledFlow> getSchedule() {
    		return new ArrayList<ScheduledFlow>(schedule);
    	}
    	
    	/**
    	 * Adds the flow to the schedule and then interrupts so it will update its wait time.
    	 * @param flow
    	 */
        public synchronized void addScheduledFlow(ScheduledFlow flow) {
        	logger.info("Adding " + flow + " to schedule.");
        	schedule.add(flow);
//            MonitorImpl.getInternalMonitorInterface().workflowEvent(null, 
//                    System.currentTimeMillis(),
//                    WorkflowAction.SCHEDULE_WORKFLOW, 
//                    WorkflowState.NOP,
//                    flow.getId());

        	this.interrupt();
        }

        /**
         * Remove scheduled flows. Does not interrupt.
         * 
         * @param flow
         */
        public synchronized void removeScheduledFlow(ScheduledFlow flow) {
        	logger.info("Removing " + flow + " from the schedule.");
        	schedule.remove(flow);
//            MonitorImpl.getInternalMonitorInterface().workflowEvent(null, 
//                    System.currentTimeMillis(),
//                    WorkflowAction.UNSCHEDULE_WORKFLOW,
//                    WorkflowState.NOP,
//                    flow.getId());
        	// Don't need to interrupt, because if this is originally on the top of the queue,
        	// it'll just skip it.
        }
        
        public void run() {
        	while(stillAlive.get()) {
        		synchronized (this) {
        			try {
        				//TODO clear up the exception handling
        				ScheduledFlow schedFlow = schedule.peek();
	    	    		
	    	    		if (schedFlow == null) {
	    	    			// If null, wake up every minute or so to see if there's something to do.
	    	    			// Most likely there will not be.
	    	    			try {
	    	    				this.wait(TIMEOUT_MS);
	    	    			} catch (InterruptedException e) {
	    	    				// interruption should occur when items are added or removed from the queue.
	    	    			}
	    	    		}
	    	    		else {
	    	    			// We've passed the flow execution time, so we will run.
	    	    			if (!schedFlow.getNextExecTime().isAfterNow()) {
	    	    				// Run flow. The invocation of flows should be quick.
	    	    				ScheduledFlow runningFlow = schedule.poll();
	    	    				logger.info("Scheduler attempting to run " + runningFlow.getScheduleId());
	
	    	    				// Execute the flow here
	    	    				try {
	    	    					Project project = projectManager.getProject(runningFlow.getProjectId());
	    	    					if (project == null) {
	    	    						logger.error("Scheduled Project "+runningFlow.getProjectId()+" does not exist!");
	    	    						throw new RuntimeException("Error finding the scheduled project. " + runningFlow.getScheduleId());
	    	    					}
	    	    					
	    	    					Flow flow = project.getFlow(runningFlow.getFlowId());
	    	    					if (flow == null) {
	    	    						logger.error("Flow " + runningFlow.getFlowId() + " cannot be found in project " + project.getName());
	    	    						throw new RuntimeException("Error finding the scheduled flow. " + runningFlow.getScheduleId());
	    	    					}
	    	    					
	    	    					HashMap<String, Props> sources;
	    	    					try {
	    	    						sources = projectManager.getAllFlowProperties(project, runningFlow.getFlowId());
	    	    					}
	    	    					catch (ProjectManagerException e) {
	    	    						logger.error(e.getMessage());
	    	    						throw new RuntimeException("Error getting the flow resources. " + runningFlow.getScheduleId());
	    	    					}
	    	    				    	    					
	    	    					// Create ExecutableFlow
	    	    					ExecutableFlow exflow = executorManager.createExecutableFlow(flow);
	    	    					exflow.setSubmitUser(runningFlow.getUser());
	    	    					//TODO make disabled in scheduled flow
//	    	    					Map<String, String> paramGroup = this.getParamGroup(req, "disabled");
//	    	    					for (Map.Entry<String, String> entry: paramGroup.entrySet()) {
//	    	    						boolean nodeDisabled = Boolean.parseBoolean(entry.getValue());
//	    	    						exflow.setStatus(entry.getKey(), nodeDisabled ? Status.DISABLED : Status.READY);
//	    	    					}
	    	    					
	    	    					// Create directory
	    	    					try {
	    	    						executorManager.setupExecutableFlow(exflow);
	    	    					} catch (ExecutorManagerException e) {
	    	    						try {
	    	    							executorManager.cleanupAll(exflow);
	    	    						} catch (ExecutorManagerException e1) {
	    	    							e1.printStackTrace();
	    	    						}
	    	    						logger.error(e.getMessage());
	    	    						return;
	    	    					}

	    	    					// Copy files to the source.
	    	    					File executionDir = new File(exflow.getExecutionPath());
	    	    					try {
	    	    						projectManager.copyProjectSourceFilesToDirectory(project, executionDir);
	    	    					} catch (ProjectManagerException e) {
	    	    						try {
	    	    							executorManager.cleanupAll(exflow);
	    	    						} catch (ExecutorManagerException e1) {
	    	    							e1.printStackTrace();
	    	    						}
	    	    						logger.error(e.getMessage());
	    	    						return;
	    	    					}
	    	    					

	    	    					try {
	    	    						executorManager.executeFlow(exflow);
	    	    					} catch (ExecutorManagerException e) {
	    	    						try {
	    	    							executorManager.cleanupAll(exflow);
	    	    						} catch (ExecutorManagerException e1) {
	    	    							e1.printStackTrace();
	    	    						}
	    	    						
	    	    						logger.error(e.getMessage());
	    	    						return;
	    	    					}
	    	    				} catch (JobExecutionException e) {
	    	    					logger.info("Could not run flow. " + e.getMessage());
	    	    				}
	    	    	        	schedule.remove(runningFlow);
	    	    				
	    	    				// Immediately reschedule if it's possible. Let the execution manager
	    	    				// handle any duplicate runs.
	    	    				if (runningFlow.updateTime()) {
	    	    					schedule.add(runningFlow);
	    	    				}
    	    					saveSchedule();
	    	    			}
	    	    			else {
	    	    				// wait until flow run
	    	    				long millisWait = Math.max(0, schedFlow.getNextExecTime().getMillis() - (new DateTime()).getMillis());
	    	    				try {
	    							this.wait(Math.min(millisWait, TIMEOUT_MS));
	    						} catch (InterruptedException e) {
	    							// interruption should occur when items are added or removed from the queue.
	    						}
	    	    			}
	    	    		}
        			}
        			catch (Exception e) {
        				logger.error("Unexpected exception has been thrown in scheduler", e);
        			}
        			catch (Throwable e) {
        				logger.error("Unexpected throwable has been thrown in scheduler", e);
        			}
        		}
        	}
        }
        
        /**
         * Class to sort the schedule based on time.
         * 
         * @author Richard Park
         */
        private class ScheduleComparator implements Comparator<ScheduledFlow>{
    		@Override
    		public int compare(ScheduledFlow arg0, ScheduledFlow arg1) {
    			DateTime first = arg1.getNextExecTime();
    			DateTime second = arg0.getNextExecTime();
    			
    			if (first.isEqual(second)) {
    				return 0;
    			}
    			else if (first.isBefore(second)) {
    				return 1;
    			}
    			
    			return -1;
    		}	
        }
    }
}