SLAManager.java

487 lines | 15.839 kB Blame History Raw Download
package azkaban.sla;

import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.log4j.Logger;
import org.joda.time.DateTime;

import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerException;

import azkaban.executor.Status;

import azkaban.sla.SLA.SlaAction;
import azkaban.sla.SLA.SlaRule;
import azkaban.sla.SLA.SlaSetting;
import azkaban.utils.Props;

/*
 * Copyright 2012 LinkedIn, Inc
 * 
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */



/**
 * The SLAManager stores and checks the SLA (Service Level Agreement). It uses a single thread
 * instead and waits until correct check time for the flow, and individual jobs in the flow if their SLA is set. 
 */
public class SLAManager {
	private static Logger logger = Logger.getLogger(SLAManager.class);

	private SLALoader loader;

	private final SLARunner runner;
	private final ExecutorManager executorManager;
	private SlaMailer mailer;

	private long lastCheckTime = -1;
	
	/**
	 * Give the sla manager a loader class that will properly load the
	 * sla.
	 * 
	 * @param loader
	 * @throws SLAManagerException 
	 */
	public SLAManager(ExecutorManager executorManager,
							SLALoader loader,
							Props props) throws SLAManagerException 
	{
		this.executorManager = executorManager;
		this.loader = loader;
		this.mailer = new SlaMailer(props);
		this.runner = new SLARunner();

		List<SLA> SLAList = null;
		try {
			SLAList = loader.loadSLAs();
		} catch (SLAManagerException e) {
			// TODO Auto-generated catch block
			throw e;
		}

		for (SLA sla : SLAList) {
			runner.addRunnerSLA(sla);
		}

		this.runner.start();
	}

	/**
	 * Shutdowns the sla thread. After shutdown, it may not be safe to use
	 * it again.
	 */
	public void shutdown() {
		this.runner.shutdown();
	}

	/**
	 * Removes the flow from the SLA if it exists.
	 * 
	 * @param id
	 * @throws SLAManagerException 
	 */
	public void removeSLA(SLA s) throws SLAManagerException {
		logger.info("Removing SLA " + s.toString());
		runner.removeRunnerSLA(s);
		loader.removeSLA(s);
	}

	public void submitSla(
			int execId, 
			String id,
			DateTime checkTime, 
			List<String> emails,
			List<SlaAction> slaActions,
			List<SlaSetting> jobSettings,
			SlaRule slaRule
			) throws SLAManagerException {
		SLA s = new SLA(execId, id, checkTime, emails, slaActions, jobSettings, slaRule);
		logger.info("Submitting SLA " + s.toString());
		try {
			loader.insertSLA(s);
			runner.addRunnerSLA(s);
		}
		catch (SLAManagerException e) {
			throw new SLAManagerException("Failed to add new SLA!" + e.getCause());
		}
	}
	
	/**
	 * Thread that simply invokes the checking of flows when the SLA is
	 * ready.
	 * 
	 */
	public class SLARunner extends Thread {
		private final PriorityBlockingQueue<SLA> SLAs;
		private AtomicBoolean stillAlive = new AtomicBoolean(true);

		// Five minute minimum intervals
		private static final int TIMEOUT_MS = 60000;

		public SLARunner() {
			this.setName("SLAManagerThread");
			SLAs = new PriorityBlockingQueue<SLA>(1,new SLAComparator());
		}

		public void shutdown() {
			logger.error("Shutting down SLA runner thread");
			stillAlive.set(false);
			this.interrupt();
		}

		/**
		 * Return a list of flow with SLAs
		 * 
		 * @return
		 */
		protected synchronized List<SLA> getRunnerSLAs() {
			return new ArrayList<SLA>(SLAs);
		}

		/**
		 * Adds SLA into runner and then interrupts so it will update
		 * its wait time.
		 * 
		 * @param flow
		 */
		public synchronized void addRunnerSLA(SLA s) {
			logger.info("Adding " + s + " to SLA runner.");
			SLAs.add(s);
			this.interrupt();
		}
		
		/**
		 * Remove runner SLA. Does not interrupt.
		 * 
		 * @param flow
		 * @throws SLAManagerException 
		 */
		public synchronized void removeRunnerSLA(SLA s) {
			logger.info("Removing " + s + " from the SLA runner.");
			SLAs.remove(s);
		}

		public void run() {
			while (stillAlive.get()) {
				synchronized (this) {
					try {
						lastCheckTime = System.currentTimeMillis();
						
						// TODO clear up the exception handling
						SLA s = SLAs.peek();

						if (s == null) {
							// If null, wake up every minute or so to see if
							// there's something to do. Most likely there will not be.
							try {
								this.wait(TIMEOUT_MS);
							} catch (InterruptedException e) {
								// interruption should occur when items are added or removed from the queue.
							}
						} else {
							// We've passed the flow execution time, so we will run.
							if (!(new DateTime(s.getCheckTime())).isAfterNow()) {
								// Run flow. The invocation of flows should be quick.
								SLA runningSLA = SLAs.poll();
								
								logger.info("Checking sla " + runningSLA.toString() );
								
								int execId = s.getExecId();
								ExecutableFlow exflow = executorManager.getExecutableFlow(execId);
								
								if(runningSLA.getJobName().equals("") && runningSLA.getRule().equals(SlaRule.WAITANDCHECKJOB)) {
									// do the checking of potential jobsla submissions
									List<SlaSetting> jobSettings = runningSLA.getJobSettings();
									List<SlaSetting> removeSettings = new ArrayList<SLA.SlaSetting>();
									for(SlaSetting set : jobSettings) {
										ExecutableNode node = exflow.getExecutableNode(set.getId());
										if(node != null) {
											if(node.getStartTime() != -1 || executorManager.isFinished(exflow)) {
												submitSla(execId, set.getId(), new DateTime(node.getStartTime()).plus(set.getDuration()), runningSLA.getEmails(), set.getActions(), null, set.getRule());
												removeSettings.add(set);
												logger.info("Job " + set.getId() + " already started, monitoring SLA.");
											}
										}
										else {
											mailer.sendSlaEmail(s, "The SLA setting for flow/job is no longer valid as flow structure has changed. Execution " + s.getExecId());
											removeSettings.add(set);
											
										}
									}
									for(SlaSetting remove : removeSettings) {
										jobSettings.remove(remove);
									}
									if(jobSettings.size() == 0) {
										removeRunnerSLA(runningSLA);
										loader.removeSLA(runningSLA);
									}
									else {
										removeRunnerSLA(runningSLA);
										loader.removeSLA(runningSLA);
										runningSLA.setCheckTime(runningSLA.getCheckTime().plusMillis(TIMEOUT_MS));
										addRunnerSLA(runningSLA);
										loader.insertSLA(runningSLA);
									}
								}
								else {
									if(!metSla(runningSLA, exflow)) {
										takeSLAFailActions(runningSLA, exflow);
									}
									else {
										takeSLASuccessActions(runningSLA, exflow);
									}


									removeRunnerSLA(runningSLA);
									loader.removeSLA(runningSLA);
								}
							} else {
								// wait until flow run
								long millisWait = Math.max(0, s.getCheckTime().getMillis() - (new DateTime()).getMillis());
								try {
									this.wait(Math.min(millisWait, TIMEOUT_MS));
								} catch (InterruptedException e) {
									// interruption should occur when items are
									// added or removed from the queue.
								}
							}
						}
					} catch (Exception e) {
						logger.error("Unexpected exception has been thrown in scheduler", e);
					} catch (Throwable e) {
						logger.error("Unexpected throwable has been thrown in scheduler", e);
					}
				}
			}
		}

		private boolean metSla(SLA s, ExecutableFlow exflow) {
			SlaRule rule = s.getRule();
			long finishTime;
			Status status;
			if(s.getJobName().equals("")) {
				finishTime = exflow.getEndTime();
				status = exflow.getStatus();
			}
			else {
				ExecutableNode exnode = exflow.getExecutableNode(s.getJobName());
				finishTime = exnode.getEndTime();
				status = exnode.getStatus();
			}
			
			switch(rule) {
				case FINISH:	// check finish time
					return finishTime != -1 && finishTime < s.getCheckTime().getMillis();
				case SUCCESS: 	// check finish and successful
					return status == Status.SUCCEEDED && finishTime < s.getCheckTime().getMillis();
				default: 
					logger.error("Unknown SLA rules!");
					return false;
			}
		}

		/**
		 * Class to sort the sla based on time.
		 * 
		 */
		private class SLAComparator implements Comparator<SLA> {
			@Override
			public int compare(SLA arg0, SLA arg1) {
				long first = arg1.getCheckTime().getMillis();
				long second = arg0.getCheckTime().getMillis();

				if (first == second) {
					return 0;
				} else if (first < second) {
					return 1;
				}

				return -1;
			}
		}
	}

	private void takeSLAFailActions(SLA s, ExecutableFlow exflow) {
		logger.info("SLA " + s.toString() + " missed! Taking predefined actions");
		List<SlaAction> actions = s.getActions();
		for(SlaAction act : actions) {
			if(act.equals(SlaAction.EMAIL)) {
				try {
					sendSlaAlertEmail(s, exflow);
				}
				catch (Exception e) {
					logger.error("Failed to send out SLA alert email. " + e.getCause());
				}
			} else if(act.equals(SlaAction.KILL)) {
				try {
					executorManager.cancelFlow(exflow, "azkaban");
					sendSlaKillEmail(s, exflow);
				} catch (ExecutorManagerException e) {
					// TODO Auto-generated catch block
					logger.error("Cancel flow failed." + e.getCause());
				}
			}
		}
	}
	
	private void takeSLASuccessActions(SLA s, ExecutableFlow exflow) {
		//sendSlaSuccessEmail(s, exflow);
		
	}
	
	private void sendSlaAlertEmail(SLA s, ExecutableFlow exflow) {
		String message = null;
		ExecutableNode exnode;
		switch(s.getRule()) {
			case FINISH:
				if(s.getJobName().equals("")) {
					message = "Flow " + exflow.getFlowId() + " failed to finish with set SLA." + String.format("%n");
					message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format("%n");
					message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
				}
				else {
					exnode = exflow.getExecutableNode(s.getJobName());
					message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " failed to finish with set SLA." + String.format("%n");
					message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
					message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
				}
				break;
			case SUCCESS:
				if(s.getJobName().equals("")) {
					message = "Flow " + exflow.getFlowId() + " didn't finish successfully with set SLA. " + String.format("%n");
					message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format("  %n");
					message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
				}
				else {
					exnode = exflow.getExecutableNode(s.getJobName());
					message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " didn't finish successfully with set SLA." + String.format("%n"); 
					message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
					message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
				}
				break;
			default:
				logger.error("Unknown SLA rules!");
				message = "Unknown SLA was not met!";
				break;
		}
		mailer.sendSlaEmail(s, message);
	}
	
	@SuppressWarnings("unused")
	private void sendSlaSuccessEmail(SLA s, ExecutableFlow exflow) {
		String message = null;
		ExecutableNode exnode;
		switch(s.getRule()) {
			case FINISH:
				if(s.getJobName().equals("")) {
					message = "Flow " + exflow.getFlowId() + " finished within the set SLA." + String.format("%n");
					message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format("%n");
					message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
				}
				else {
					exnode = exflow.getExecutableNode(s.getJobName());
					message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " finished within the set SLA." + String.format("%n");
					message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
					message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
				}
				break;
			case SUCCESS:
				if(s.getJobName().equals("")) {
					message = "Flow " + exflow.getFlowId() + " successfully finished within the set SLA." + String.format("%n");
					message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format("  %n");
					message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
				}
				else {
					exnode = exflow.getExecutableNode(s.getJobName());
					message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " successfully finished within the set SLA." + String.format("%n"); 
					message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
					message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
				}
				break;
			default:
				logger.error("Unknown SLA rules!");
				message = "Unknown SLA was not met!";
				break;
		}
		mailer.sendSlaEmail(s, message);
	}
	
	private void sendSlaKillEmail(SLA s, ExecutableFlow exflow) {
		String message = null;
		ExecutableNode exnode;
		switch(s.getRule()) {
			case FINISH:
				if(s.getJobName().equals("")) {
					message = "Flow " + exflow.getFlowId() + " failed to finish with set SLA and is killed. " + String.format("%n");
					message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format("%n");
					message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
				}
				else {
					exnode = exflow.getExecutableNode(s.getJobName());
						message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " failed to finish with set SLA and is killed. " + String.format("%n");
						message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
						message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
				}
				break;
			case SUCCESS:
				if(s.getJobName().equals("")) {
					message = "Flow " + exflow.getFlowId() + " didn't finish successfully with set SLA and is killed. " + String.format("%n");
					message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format("  %n");
					message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
				}
				else {
					exnode = exflow.getExecutableNode(s.getJobName());
					message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " didn't finish successfully with set SLA and is killed. " + String.format("%n"); 
					message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
					message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
				}
				break;
			default:
				logger.error("Unknown SLA rules!");
				message = "Unknown SLA was not met!";
				break;
		}
		mailer.sendSlaEmail(s, message);
	}

	public int getNumActiveSLA() {
		return runner.getRunnerSLAs().size();
	}
	
	public State getSLAThreadState() {
		return runner.getState();
	}
	
	public boolean isThreadActive() {
		return runner.isAlive();
	}
	
	public List<SLA> getSLAList() {
		return runner.getRunnerSLAs();
	}
	
	public long getLastCheckTime() {
		return lastCheckTime;
	}
}