SLAManager.java
Home
/
src /
java /
azkaban /
sla /
SLAManager.java
package azkaban.sla;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.apache.velocity.runtime.parser.node.GetExecutor;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadablePeriod;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutorMailer;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
import azkaban.sla.SLA.SlaAction;
import azkaban.sla.SLA.SlaRule;
import azkaban.sla.SLA.SlaSetting;
import azkaban.user.User;
import azkaban.utils.Pair;
import azkaban.utils.Props;
public class SLAManager {
private static Logger logger = Logger.getLogger(SLAManager.class);
private SLALoader loader;
private final SLARunner runner;
private final SLAPreRunner prerunner;
private final ExecutorManager executorManager;
private SlaMailer mailer;
public SLAManager(ExecutorManager executorManager,
SLALoader loader,
Props props) throws SLAManagerException
{
this.executorManager = executorManager;
this.loader = loader;
this.mailer = new SlaMailer(props);
this.runner = new SLARunner();
this.prerunner = new SLAPreRunner();
List<SLA> SLAList = null;
try {
SLAList = loader.loadSLAs();
} catch (SLAManagerException e) {
throw e;
}
for (SLA sla : SLAList) {
runner.addRunnerSLA(sla);
}
this.runner.start();
}
public void shutdown() {
this.runner.shutdown();
this.prerunner.shutdown();
}
public void removeSLA(SLA s) throws SLAManagerException {
logger.info("Removing SLA " + s.toString());
runner.removeRunnerSLA(s);
loader.removeSLA(s);
}
public void submitSla(
int execId,
String id,
DateTime checkTime,
List<String> emails,
List<SlaAction> slaActions,
List<SlaSetting> jobSettings,
SlaRule slaRule
) throws SLAManagerException {
SLA s = new SLA(execId, id, checkTime, emails, slaActions, jobSettings, slaRule);
logger.info("Submitting SLA " + s.toString());
try {
loader.insertSLA(s);
runner.addRunnerSLA(s);
}
catch (SLAManagerException e) {
throw new SLAManagerException("Failed to add new SLA!" + e.getCause());
}
}
public class SLARunner extends Thread {
private final PriorityBlockingQueue<SLA> SLAs;
private AtomicBoolean stillAlive = new AtomicBoolean(true);
private static final int TIMEOUT_MS = 60000;
public SLARunner() {
SLAs = new PriorityBlockingQueue<SLA>(1,new SLAComparator());
}
public void shutdown() {
logger.error("Shutting down SLA runner thread");
stillAlive.set(false);
this.interrupt();
}
protected synchronized List<SLA> getRunnerSLAs() {
return new ArrayList<SLA>(SLAs);
}
public synchronized void addRunnerSLA(SLA s) {
logger.info("Adding " + s + " to SLA runner.");
SLAs.add(s);
this.interrupt();
}
public synchronized void removeRunnerSLA(SLA s) {
logger.info("Removing " + s + " from the SLA runner.");
SLAs.remove(s);
}
public void run() {
while (stillAlive.get()) {
synchronized (this) {
try {
SLA s = SLAs.peek();
if (s == null) {
try {
this.wait(TIMEOUT_MS);
} catch (InterruptedException e) {
}
} else {
if (!(new DateTime(s.getCheckTime())).isAfterNow()) {
SLA runningSLA = SLAs.poll();
logger.info("Checking sla " + runningSLA.toString() );
int execId = s.getExecId();
ExecutableFlow exflow = executorManager.getExecutableFlow(execId);
if(runningSLA.getJobName().equals("") && runningSLA.getRule().equals(SlaRule.WAITANDCHECKJOB)) {
List<SlaSetting> jobSettings = runningSLA.getJobSettings();
List<SlaSetting> removeSettings = new ArrayList<SLA.SlaSetting>();
for(SlaSetting set : jobSettings) {
ExecutableNode node = exflow.getExecutableNode(set.getId());
if(node.getStartTime() != -1 || executorManager.isFinished(exflow)) {
submitSla(execId, set.getId(), new DateTime(node.getStartTime()).plus(set.getDuration()), runningSLA.getEmails(), set.getActions(), null, set.getRule());
removeSettings.add(set);
logger.info("Job " + set.getId() + " already started, monitoring SLA.");
}
}
for(SlaSetting remove : removeSettings) {
jobSettings.remove(remove);
}
if(jobSettings.size() == 0) {
removeRunnerSLA(runningSLA);
loader.removeSLA(runningSLA);
}
else {
removeRunnerSLA(runningSLA);
loader.removeSLA(runningSLA);
runningSLA.setCheckTime(runningSLA.getCheckTime().plusMillis(TIMEOUT_MS));
addRunnerSLA(runningSLA);
loader.insertSLA(runningSLA);
}
}
else {
if(!metSla(runningSLA, exflow)) {
takeSLAActions(runningSLA, exflow);
}
removeRunnerSLA(runningSLA);
loader.removeSLA(runningSLA);
}
} else {
long millisWait = Math.max(0, s.getCheckTime().getMillis() - (new DateTime()).getMillis());
try {
this.wait(Math.min(millisWait, TIMEOUT_MS));
} catch (InterruptedException e) {
}
}
}
} catch (Exception e) {
logger.error("Unexpected exception has been thrown in scheduler", e);
} catch (Throwable e) {
logger.error("Unexpected throwable has been thrown in scheduler", e);
}
}
}
}
private boolean metSla(SLA s, ExecutableFlow exflow) {
SlaRule rule = s.getRule();
long finishTime;
Status status;
if(s.getJobName().equals("")) {
finishTime = exflow.getEndTime();
status = exflow.getStatus();
}
else {
ExecutableNode exnode = exflow.getExecutableNode(s.getJobName());
finishTime = exnode.getEndTime();
status = exnode.getStatus();
}
switch(rule) {
case FINISH:
return finishTime != -1 && finishTime < s.getCheckTime().getMillis();
case SUCCESS:
return status == Status.SUCCEEDED && finishTime < s.getCheckTime().getMillis();
default:
logger.error("Unknown SLA rules!");
return false;
}
}
private class SLAComparator implements Comparator<SLA> {
@Override
public int compare(SLA arg0, SLA arg1) {
long first = arg1.getCheckTime().getMillis();
long second = arg0.getCheckTime().getMillis();
if (first == second) {
return 0;
} else if (first < second) {
return 1;
}
return -1;
}
}
}
private void takeSLAActions(SLA s, ExecutableFlow exflow) {
logger.info("SLA " + s.toString() + " missed! Taking predefined actions");
List<SlaAction> actions = s.getActions();
for(SlaAction act : actions) {
if(act.equals(SlaAction.EMAIL)) {
try {
sendSlaAlertEmail(s, exflow);
}
catch (Exception e) {
logger.error("Failed to send out SLA alert email. " + e.getCause());
}
} else if(act.equals(SlaAction.KILL)) {
try {
executorManager.cancelFlow(exflow, "azkaban");
} catch (ExecutorManagerException e) {
logger.error("Cancel flow failed." + e.getCause());
}
}
}
}
private void sendSlaAlertEmail(SLA s, ExecutableFlow exflow) {
String message = null;
ExecutableNode exnode;
switch(s.getRule()) {
case FINISH:
if(s.getJobName().equals("")) {
message = "Flow " + exflow.getFlowId() + " failed to finish with set SLA." + String.format("%n");
message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format("%n");
message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
}
else {
exnode = exflow.getExecutableNode(s.getJobName());
message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " failed to finish with set SLA." + String.format("%n");
message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
}
break;
case SUCCESS:
if(s.getJobName().equals("")) {
message = "Flow " + exflow.getFlowId() + " didn't finish successfully with set SLA. " + String.format("%n");
message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format(" %n");
message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
}
else {
exnode = exflow.getExecutableNode(s.getJobName());
message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " didn't finish successfully with set SLA." + String.format("%n");
message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
}
break;
default:
logger.error("Unknown SLA rules!");
message = "Unknown SLA was not met!";
break;
}
mailer.sendSlaEmail(s, message);
}
public class SLAPreRunner extends Thread {
private final List<SLA> preSlas;
private AtomicBoolean stillAlive = new AtomicBoolean(true);
private static final int TIMEOUT_MS = 300000;
public SLAPreRunner() {
preSlas = new ArrayList<SLA>();
}
public void shutdown() {
logger.error("Shutting down pre-sla checker thread");
stillAlive.set(false);
this.interrupt();
}
protected synchronized List<SLA> getPreSlas() {
return new ArrayList<SLA>(preSlas);
}
public synchronized void addCheckerPreSla(SLA s) {
logger.info("Adding " + s + " to pre-sla checker.");
preSlas.add(s);
this.interrupt();
}
public synchronized void removeCheckerPreSla(SLA s) {
logger.info("Removing " + s + " from the pre-sla checker.");
preSlas.remove(s);
}
public void run() {
while (stillAlive.get()) {
synchronized (this) {
try {
if (preSlas.size() == 0) {
try {
this.wait(TIMEOUT_MS);
} catch (InterruptedException e) {
}
} else {
for(SLA s : preSlas) {
ExecutableFlow exflow = executorManager.getExecutableFlow(s.getExecId());
String id = s.getJobName();
if(!s.equals("")) {
ExecutableNode exnode = exflow.getExecutableNode(id);
if(exnode.getStartTime() != -1) {
}
}
}
}
} catch (Exception e) {
logger.error("Unexpected exception has been thrown in scheduler", e);
} catch (Throwable e) {
logger.error("Unexpected throwable has been thrown in scheduler", e);
}
}
}
}
}
}