azkaban-uncached
Changes
src/java/azkaban/executor/ExecutorManager.java 39(+26 -13)
src/java/azkaban/jmx/JmxTriggerManager.java 52(+52 -0)
src/java/azkaban/scheduler/Schedule.java 45(+40 -5)
src/java/azkaban/scheduler/ScheduleManager.java 102(+72 -30)
src/java/azkaban/sla/SlaOption.java 129(+129 -0)
src/java/azkaban/trigger/builtin/ExecuteFlowAction.java 145(+108 -37)
src/java/azkaban/trigger/builtin/SlaAlertAction.java 170(+170 -0)
src/java/azkaban/trigger/builtin/SlaChecker.java 229(+229 -0)
src/java/azkaban/trigger/builtin/WatchSlaAction.java 206(+206 -0)
src/java/azkaban/trigger/Trigger.java 87(+84 -3)
src/java/azkaban/trigger/TriggerManager.java 53(+44 -9)
src/java/azkaban/webapp/AzkabanWebServer.java 69(+56 -13)
src/java/azkaban/webapp/servlet/ScheduleServlet.java 352(+183 -169)
src/web/js/azkaban.scheduled.view.js 4(+0 -4)
Details
diff --git a/src/java/azkaban/executor/ExecutorMailer.java b/src/java/azkaban/executor/ExecutorMailer.java
index 39363ba..8a3864e 100644
--- a/src/java/azkaban/executor/ExecutorMailer.java
+++ b/src/java/azkaban/executor/ExecutorMailer.java
@@ -9,6 +9,7 @@ import org.apache.log4j.Logger;
import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.executor.ExecutorManager.Alerter;
+import azkaban.sla.SlaOption;
import azkaban.utils.AbstractMailer;
import azkaban.utils.EmailMessage;
import azkaban.utils.Props;
@@ -25,6 +26,29 @@ public class ExecutorMailer extends AbstractMailer implements Alerter {
testMode = props.getBoolean("test.mode", false);
}
+ @SuppressWarnings("unchecked")
+ private void sendSlaAlertEmail(SlaOption slaOption, String slaMessage) {
+ String subject = "Sla Violation Alert";
+ String body = slaMessage;
+ List<String> emailList = (List<String>) slaOption.getInfo().get(SlaOption.INFO_EMAIL_LIST);
+ if (emailList != null && !emailList.isEmpty()) {
+ EmailMessage message = super.createEmailMessage(
+ subject,
+ "text/html",
+ emailList);
+
+ message.setBody(body);
+
+ if (!testMode) {
+ try {
+ message.sendEmail();
+ } catch (MessagingException e) {
+ logger.error("Email message send failed" , e);
+ }
+ }
+ }
+ }
+
public void sendFirstErrorMessage(ExecutableFlow flow) {
ExecutionOptions option = flow.getExecutionOptions();
List<String> emailList = option.getDisabledJobs();
@@ -180,4 +204,10 @@ public class ExecutorMailer extends AbstractMailer implements Alerter {
public void alertOnFirstError(ExecutableFlow exflow) throws Exception {
sendFirstErrorMessage(exflow);
}
+
+ @Override
+ public void alertOnSla(SlaOption slaOption, String slaMessage)
+ throws Exception {
+ sendSlaAlertEmail(slaOption, slaMessage);
+ }
}
\ No newline at end of file
src/java/azkaban/executor/ExecutorManager.java 39(+26 -13)
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 1b24a07..92a941c 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -46,6 +46,7 @@ import org.joda.time.DateTime;
import azkaban.project.Project;
import azkaban.scheduler.ScheduleStatisticManager;
+import azkaban.sla.SlaOption;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.FileIOUtils;
@@ -84,6 +85,7 @@ public class ExecutorManager {
void alertOnSuccess(ExecutableFlow exflow) throws Exception;
void alertOnError(ExecutableFlow exflow, String ... extraReasons) throws Exception;
void alertOnFirstError(ExecutableFlow exflow) throws Exception;
+ void alertOnSla(SlaOption slaOption, String slaMessage) throws Exception;
}
public ExecutorManager(Props props, ExecutorLoader loader, boolean isActive) throws ExecutorManagerException {
@@ -97,9 +99,10 @@ public class ExecutorManager {
this.isActive = isActive;
+ executingManager = new ExecutingManagerUpdaterThread();
+ executingManager.start();
+
if(isActive) {
- executingManager = new ExecutingManagerUpdaterThread();
- executingManager.start();
long executionLogsRetentionMs = props.getLong("execution.logs.retention.ms", DEFAULT_EXECUTION_LOGS_RETENTION_MS);
cleanerThread = new CleanerThread(executionLogsRetentionMs);
@@ -862,6 +865,7 @@ public class ExecutorManager {
}
private void finalizeFlows(ExecutableFlow flow) {
+
int execId = flow.getExecutionId();
// First we check if the execution in the datastore is complete
@@ -873,22 +877,30 @@ public class ExecutorManager {
else {
dsFlow = executorLoader.fetchExecutableFlow(execId);
+
+ }
+
+ if(isActive) {
// If it's marked finished, we're good. If not, we fail everything and then mark it finished.
if (!isFinished(dsFlow)) {
failEverything(dsFlow);
executorLoader.updateExecutableFlow(dsFlow);
}
- }
-
- // Delete the executing reference.
- if (flow.getEndTime() == -1) {
- flow.setEndTime(System.currentTimeMillis());
- executorLoader.updateExecutableFlow(dsFlow);
- }
- executorLoader.removeActiveExecutableReference(execId);
- runningFlows.remove(execId);
- recentlyFinished.put(execId, dsFlow);
+ // Delete the executing reference.
+ if (flow.getEndTime() == -1) {
+ flow.setEndTime(System.currentTimeMillis());
+ executorLoader.updateExecutableFlow(dsFlow);
+ }
+ executorLoader.removeActiveExecutableReference(execId);
+
+ runningFlows.remove(execId);
+ recentlyFinished.put(execId, dsFlow);
+ } else {
+ runningFlows.remove(execId);
+ recentlyFinished.put(execId, dsFlow);
+ return;
+ }
} catch (ExecutorManagerException e) {
logger.error(e);
}
@@ -1005,6 +1017,7 @@ public class ExecutorManager {
}
private ExecutableFlow updateExecution(Map<String,Object> updateData) throws ExecutorManagerException {
+
Integer execId = (Integer)updateData.get(ConnectorParams.UPDATE_MAP_EXEC_ID);
if (execId == null) {
throw new ExecutorManagerException("Response is malformed. Need exec id to update.");
@@ -1030,7 +1043,7 @@ public class ExecutorManager {
Status newStatus = flow.getStatus();
ExecutionOptions options = flow.getExecutionOptions();
- if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING)) {
+ if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING) && isActive) {
// We want to see if we should give an email status on first failure.
if (options.getNotifyOnFirstFailure()) {
Alerter mailAlerter = alerters.get("email");
src/java/azkaban/jmx/JmxTriggerManager.java 52(+52 -0)
diff --git a/src/java/azkaban/jmx/JmxTriggerManager.java b/src/java/azkaban/jmx/JmxTriggerManager.java
new file mode 100644
index 0000000..0bacf7b
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxTriggerManager.java
@@ -0,0 +1,52 @@
+package azkaban.jmx;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.joda.time.DateTime;
+
+import azkaban.trigger.TriggerManager;
+
+public class JmxTriggerManager implements JmxTriggerManagerMBean {
+ private TriggerManager manager;
+
+ public JmxTriggerManager(TriggerManager manager) {
+ this.manager = manager;
+ }
+
+ @Override
+ public String getLastThreadCheckTime() {
+ return new DateTime(manager.getLastThreadCheckTime()).toString();
+ }
+
+ @Override
+ public boolean isThreadActive() {
+ return manager.isThreadActive();
+ }
+
+ @Override
+ public List<String> getPrimaryTriggerHostPorts() {
+ return new ArrayList<String>(manager.getPrimaryServerHosts());
+ }
+
+ @Override
+ public List<String> getAllTriggerHostPorts() {
+ return new ArrayList<String>(manager.getAllActiveTriggerServerHosts());
+ }
+
+ @Override
+ public int getNumTriggers() {
+ return manager.getNumTriggers();
+ }
+
+ @Override
+ public String getTriggerSources() {
+ return manager.getTriggerSources();
+ }
+
+ @Override
+ public String getTriggerIds() {
+ return manager.getTriggerIds();
+ }
+
+}
diff --git a/src/java/azkaban/jmx/JmxTriggerManagerMBean.java b/src/java/azkaban/jmx/JmxTriggerManagerMBean.java
new file mode 100644
index 0000000..af3cc9a
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxTriggerManagerMBean.java
@@ -0,0 +1,27 @@
+package azkaban.jmx;
+
+import java.util.List;
+
+public interface JmxTriggerManagerMBean {
+
+ @DisplayName("OPERATION: getLastThreadCheckTime")
+ public String getLastThreadCheckTime();
+
+ @DisplayName("OPERATION: isThreadActive")
+ public boolean isThreadActive();
+
+ @DisplayName("OPERATION: getPrimaryTriggerHostPorts")
+ public List<String> getPrimaryTriggerHostPorts();
+
+ @DisplayName("OPERATION: getAllTriggerHostPorts")
+ public List<String> getAllTriggerHostPorts();
+
+ @DisplayName("OPERATION: getNumTriggers")
+ public int getNumTriggers();
+
+ @DisplayName("OPERATION: getTriggerSources")
+ public String getTriggerSources();
+
+ @DisplayName("OPERATION: getTriggerIds")
+ public String getTriggerIds();
+}
diff --git a/src/java/azkaban/jmx/JmxTriggerRunnerManager.java b/src/java/azkaban/jmx/JmxTriggerRunnerManager.java
index d0af406..9a57638 100644
--- a/src/java/azkaban/jmx/JmxTriggerRunnerManager.java
+++ b/src/java/azkaban/jmx/JmxTriggerRunnerManager.java
@@ -19,4 +19,24 @@ public class JmxTriggerRunnerManager implements JmxTriggerRunnerManagerMBean {
return manager.isRunnerThreadActive();
}
+ @Override
+ public int getNumTriggers() {
+ return manager.getNumTriggers();
+ }
+
+ @Override
+ public String getTriggerSources() {
+ return manager.getTriggerSources();
+ }
+
+ @Override
+ public String getTriggerIds() {
+ return manager.getTriggerIds();
+ }
+
+ @Override
+ public long getScannerIdleTime() {
+ return manager.getScannerIdleTime();
+ }
+
}
diff --git a/src/java/azkaban/jmx/JmxTriggerRunnerManagerMBean.java b/src/java/azkaban/jmx/JmxTriggerRunnerManagerMBean.java
index 77b72e7..ca0f45b 100644
--- a/src/java/azkaban/jmx/JmxTriggerRunnerManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxTriggerRunnerManagerMBean.java
@@ -5,7 +5,19 @@ public interface JmxTriggerRunnerManagerMBean {
@DisplayName("OPERATION: getLastRunnerThreadCheckTime")
public long getLastRunnerThreadCheckTime();
+ @DisplayName("OPERATION: getNumTriggers")
+ public int getNumTriggers();
+
@DisplayName("OPERATION: isRunnerThreadActive")
public boolean isRunnerThreadActive();
-
+
+ @DisplayName("OPERATION: getTriggerSources")
+ public String getTriggerSources();
+
+ @DisplayName("OPERATION: getTriggerIds")
+ public String getTriggerIds();
+
+ @DisplayName("OPERATION: getScannerIdleTime")
+ public long getScannerIdleTime();
+
}
src/java/azkaban/scheduler/Schedule.java 45(+40 -5)
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index ccbad60..2f529a6 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -16,7 +16,9 @@
package azkaban.scheduler;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.joda.time.DateTime;
@@ -31,6 +33,7 @@ import org.joda.time.Seconds;
import org.joda.time.Weeks;
import azkaban.executor.ExecutionOptions;
+import azkaban.sla.SlaOption;
import azkaban.utils.Pair;
public class Schedule{
@@ -56,6 +59,7 @@ public class Schedule{
private boolean skipPastOccurrences = true;
private ExecutionOptions executionOptions;
+ private List<SlaOption> slaOptions;
public Schedule(
int scheduleId,
@@ -84,6 +88,7 @@ public class Schedule{
nextExecTime,
submitTime,
submitUser,
+ null,
null
);
}
@@ -101,7 +106,8 @@ public class Schedule{
long nextExecTime,
long submitTime,
String submitUser,
- ExecutionOptions executionOptions
+ ExecutionOptions executionOptions,
+ List<SlaOption> slaOptions
) {
this(scheduleId, projectId,
projectName,
@@ -114,7 +120,8 @@ public class Schedule{
nextExecTime,
submitTime,
submitUser,
- executionOptions
+ executionOptions,
+ slaOptions
);
}
@@ -131,7 +138,8 @@ public class Schedule{
long nextExecTime,
long submitTime,
String submitUser,
- ExecutionOptions executionOptions
+ ExecutionOptions executionOptions,
+ List<SlaOption> slaOptions
) {
this.scheduleId = scheduleId;
this.projectId = projectId;
@@ -146,15 +154,24 @@ public class Schedule{
this.status = status;
this.submitTime = submitTime;
this.executionOptions = executionOptions;
+ this.slaOptions = slaOptions;
}
public ExecutionOptions getExecutionOptions() {
return executionOptions;
}
+
+ public List<SlaOption> getSlaOptions() {
+ return slaOptions;
+ }
public void setFlowOptions(ExecutionOptions executionOptions) {
this.executionOptions = executionOptions;
}
+
+ public void setSlaOptions(List<SlaOption> slaOptions) {
+ this.slaOptions = slaOptions;
+ }
public String getScheduleName() {
return projectName + "." + flowName + " (" + projectId + ")";
@@ -328,7 +345,6 @@ public class Schedule{
return periodStr;
}
-
public Map<String,Object> optionsToObject() {
if(executionOptions != null ) {
HashMap<String, Object> schedObj = new HashMap<String, Object>();
@@ -336,6 +352,15 @@ public class Schedule{
if(executionOptions != null) {
schedObj.put("executionOptions", executionOptions.toObject());
}
+
+ if(slaOptions != null) {
+ List<Object> slaOptionsObject = new ArrayList<Object>();
+// schedObj.put("slaOptions", slaOptions.toObject());
+ for(SlaOption sla : slaOptions) {
+ slaOptionsObject.add(sla.toObject());
+ }
+ schedObj.put("slaOptions", slaOptionsObject);
+ }
return schedObj;
}
@@ -358,7 +383,17 @@ public class Schedule{
this.executionOptions = new ExecutionOptions();
this.executionOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
}
-
+
+ if(schedObj.containsKey("slaOptions")) {
+ List<Object> slaOptionsObject = (List<Object>) schedObj.get("slaOptions");
+ List<SlaOption> slaOptions = new ArrayList<SlaOption>();
+ for(Object slaObj : slaOptionsObject) {
+ slaOptions.add(SlaOption.fromObject(slaObj));
+ }
+ this.slaOptions = slaOptions;
+ }
+
+
}
public boolean isRecurring() {
src/java/azkaban/scheduler/ScheduleManager.java 102(+72 -30)
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 4ce0e25..ad4104a 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -41,6 +41,7 @@ import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
+import azkaban.sla.SlaOption;
import azkaban.trigger.TriggerAgent;
import azkaban.trigger.TriggerStatus;
import azkaban.utils.Pair;
@@ -59,8 +60,9 @@ public class ScheduleManager implements TriggerAgent {
private final DateTimeFormatter _dateFormat = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss:SSS");
private ScheduleLoader loader;
- private Map<Pair<Integer, String>, Set<Schedule>> scheduleIdentityPairMap = new LinkedHashMap<Pair<Integer, String>, Set<Schedule>>();
+// private Map<Pair<Integer, String>, Set<Schedule>> scheduleIdentityPairMap = new LinkedHashMap<Pair<Integer, String>, Set<Schedule>>();
private Map<Integer, Schedule> scheduleIDMap = new LinkedHashMap<Integer, Schedule>();
+ private Map<Pair<Integer, String>, Schedule> scheduleIdentityPairMap = new LinkedHashMap<Pair<Integer, String>, Schedule>();
private final ExecutorManager executorManager;
@@ -186,10 +188,14 @@ public class ScheduleManager implements TriggerAgent {
* @return
* @throws ScheduleManagerException
*/
- public Set<Schedule> getSchedules(int projectId, String flowId) throws ScheduleManagerException {
- updateLocal();
- return scheduleIdentityPairMap.get(new Pair<Integer,String>(projectId, flowId));
- }
+// public Set<Schedule> getSchedules(int projectId, String flowId) throws ScheduleManagerException {
+// updateLocal();
+// return scheduleIdentityPairMap.get(new Pair<Integer,String>(projectId, flowId));
+// }
+ public Schedule getSchedule(int projectId, String flowId) throws ScheduleManagerException {
+ updateLocal();
+ return scheduleIdentityPairMap.get(new Pair<Integer,String>(projectId, flowId));
+}
/**
* Returns the scheduled flow for the scheduleId
@@ -210,12 +216,18 @@ public class ScheduleManager implements TriggerAgent {
* @param id
* @throws ScheduleManagerException
*/
- public synchronized void removeSchedules(int projectId, String flowId) throws ScheduleManagerException {
- Set<Schedule> schedules = getSchedules(projectId, flowId);
- if(schedules != null) {
- for(Schedule sched : schedules) {
- removeSchedule(sched);
- }
+// public synchronized void removeSchedules(int projectId, String flowId) throws ScheduleManagerException {
+// Set<Schedule> schedules = getSchedules(projectId, flowId);
+// if(schedules != null) {
+// for(Schedule sched : schedules) {
+// removeSchedule(sched);
+// }
+// }
+// }
+ public synchronized void removeSchedule(int projectId, String flowId) throws ScheduleManagerException {
+ Schedule sched = getSchedule(projectId, flowId);
+ if(sched != null) {
+ removeSchedule(sched);
}
}
/**
@@ -226,13 +238,18 @@ public class ScheduleManager implements TriggerAgent {
public synchronized void removeSchedule(Schedule sched) {
Pair<Integer,String> identityPairMap = sched.getScheduleIdentityPair();
- Set<Schedule> schedules = scheduleIdentityPairMap.get(identityPairMap);
- if(schedules != null) {
- schedules.remove(sched);
- if(schedules.size() == 0) {
- scheduleIdentityPairMap.remove(identityPairMap);
- }
+// Set<Schedule> schedules = scheduleIdentityPairMap.get(identityPairMap);
+// if(schedules != null) {
+// schedules.remove(sched);
+// if(schedules.size() == 0) {
+// scheduleIdentityPairMap.remove(identityPairMap);
+// }
+// }
+ Schedule schedule = scheduleIdentityPairMap.get(identityPairMap);
+ if(schedule != null) {
+ scheduleIdentityPairMap.remove(identityPairMap);
}
+
scheduleIDMap.remove(sched.getScheduleId());
try {
@@ -284,7 +301,7 @@ public class ScheduleManager implements TriggerAgent {
final long submitTime,
final String submitUser
) {
- return scheduleFlow(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null);
+ return scheduleFlow(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null, null);
}
public Schedule scheduleFlow(
@@ -300,9 +317,10 @@ public class ScheduleManager implements TriggerAgent {
final long nextExecTime,
final long submitTime,
final String submitUser,
- ExecutionOptions execOptions
+ ExecutionOptions execOptions,
+ List<SlaOption> slaOptions
) {
- Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions);
+ Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions, slaOptions);
logger.info("Scheduling flow '" + sched.getScheduleName() + "' for "
+ _dateFormat.print(firstSchedTime) + " with a period of "
+ period == null ? "(non-recurring)" : period);
@@ -317,7 +335,11 @@ public class ScheduleManager implements TriggerAgent {
* @param flow
*/
private synchronized void internalSchedule(Schedule s) {
- Schedule existing = scheduleIDMap.get(s.getScheduleId());
+ //Schedule existing = scheduleIDMap.get(s.getScheduleId());
+ Schedule existing = null;
+ if(scheduleIdentityPairMap.get(s.getScheduleIdentityPair()) != null) {
+ existing = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
+ }
if(!useExternalRunner) {
if (existing != null) {
this.runner.removeRunnerSchedule(existing);
@@ -326,12 +348,13 @@ public class ScheduleManager implements TriggerAgent {
this.runner.addRunnerSchedule(s);
}
scheduleIDMap.put(s.getScheduleId(), s);
- Set<Schedule> schedules = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
- if(schedules == null) {
- schedules = new HashSet<Schedule>();
- scheduleIdentityPairMap.put(s.getScheduleIdentityPair(), schedules);
- }
- schedules.add(s);
+// Set<Schedule> schedules = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
+// if(schedules == null) {
+// schedules = new HashSet<Schedule>();
+// scheduleIdentityPairMap.put(s.getScheduleIdentityPair(), schedules);
+// }
+// schedules.add(s);
+ scheduleIdentityPairMap.put(s.getScheduleIdentityPair(), s);
}
/**
@@ -340,14 +363,16 @@ public class ScheduleManager implements TriggerAgent {
* @param flow
*/
public synchronized void insertSchedule(Schedule s) {
- boolean exist = s.getScheduleId() != -1;
+ //boolean exist = s.getScheduleId() != -1;
+ Schedule exist = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
if(s.updateTime()) {
try {
- if(!exist) {
+ if(exist == null) {
loader.insertSchedule(s);
internalSchedule(s);
}
else{
+ s.setScheduleId(exist.getScheduleId());
loader.updateSchedule(s);
internalSchedule(s);
}
@@ -524,7 +549,24 @@ public class ScheduleManager implements TriggerAgent {
e.printStackTrace();
throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
}
-
+// SlaOptions slaOptions = runningSched.getSlaOptions();
+// if(slaOptions != null) {
+// logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
+// // submit flow slas
+// List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
+// for(SlaSetting set : slaOptions.getSettings()) {
+// if(set.getId().equals("")) {
+// DateTime checkTime = new DateTime(runningSched.getNextExecTime()).plus(set.getDuration());
+// slaManager.submitSla(exflow.getExecutionId(), "", checkTime, slaOptions.getSlaEmails(), set.getActions(), null, set.getRule());
+// }
+// else {
+// jobsettings.add(set);
+// }
+// }
+// if(jobsettings.size() > 0) {
+// slaManager.submitSla(exflow.getExecutionId(), "", DateTime.now(), slaOptions.getSlaEmails(), new ArrayList<SlaAction>(), jobsettings, SlaRule.WAITANDCHECKJOB);
+// }
+// }
}
catch (ExecutorManagerException e) {
if (e.getReason() != null && e.getReason() == ExecutorManagerException.Reason.SkippedExecution) {
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index b79fc3f..ba6bdd0 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -10,15 +10,17 @@ import org.joda.time.DateTime;
import azkaban.executor.ExecutorManager;
import azkaban.project.ProjectManager;
+import azkaban.sla.SlaOption;
import azkaban.trigger.Condition;
import azkaban.trigger.ConditionChecker;
import azkaban.trigger.Trigger;
import azkaban.trigger.TriggerAction;
import azkaban.trigger.TriggerManager;
import azkaban.trigger.TriggerManagerException;
-import azkaban.trigger.TriggerStatus;
import azkaban.trigger.builtin.BasicTimeChecker;
+import azkaban.trigger.builtin.CreateTriggerAction;
import azkaban.trigger.builtin.ExecuteFlowAction;
+import azkaban.trigger.builtin.SlaChecker;
public class TriggerBasedScheduleLoader implements ScheduleLoader {
@@ -44,7 +46,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
Condition triggerCondition = createTimeTriggerCondition(s);
Condition expireCondition = createTimeExpireCondition(s);
List<TriggerAction> actions = createActions(s);
- Trigger t = new Trigger(new DateTime(s.getLastModifyTime()), new DateTime(s.getSubmitTime()), s.getSubmitUser(), triggerSource, triggerCondition, expireCondition, actions);
+ Trigger t = new Trigger(s.getScheduleId(), new DateTime(s.getLastModifyTime()), new DateTime(s.getSubmitTime()), s.getSubmitUser(), triggerSource, triggerCondition, expireCondition, actions);
if(s.isRecurring()) {
t.setResetOnTrigger(true);
}
@@ -53,8 +55,18 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
private List<TriggerAction> createActions (Schedule s) {
List<TriggerAction> actions = new ArrayList<TriggerAction>();
- TriggerAction act = new ExecuteFlowAction(s.getProjectId(), s.getProjectName(), s.getFlowName(), s.getSubmitUser(), s.getExecutionOptions());
- actions.add(act);
+ ExecuteFlowAction executeAct = new ExecuteFlowAction("executeFlowAction", s.getProjectId(), s.getProjectName(), s.getFlowName(), s.getSubmitUser(), s.getExecutionOptions(), s.getSlaOptions());
+ actions.add(executeAct);
+// List<SlaOption> slaOptions = s.getSlaOptions();
+// if(slaOptions != null && slaOptions.size() > 0) {
+// // insert a trigger to keep watching that execution
+// for(SlaOption sla : slaOptions) {
+// // need to create triggers for each sla
+// SlaChecker slaChecker = new SlaChecker("slaChecker", sla, executeAct.getId());
+//
+// }
+// }
+
return actions;
}
@@ -110,7 +122,6 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
List<Schedule> schedules = new ArrayList<Schedule>();
// triggersLocalCopy = new HashMap<Integer, Trigger>();
for(Trigger t : triggers) {
-// triggersLocalCopy.put(t.getTriggerId(), t);
lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime().getMillis());
Schedule s = triggerToSchedule(t);
schedules.add(s);
@@ -151,7 +162,9 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
t.getLastModifyTime().getMillis(),
ck.getNextCheckTime().getMillis(),
t.getSubmitTime().getMillis(),
- t.getSubmitUser());
+ t.getSubmitUser(),
+ act.getExecutionOptions(),
+ act.getSlaOptions());
return s;
} else {
logger.error("Failed to parse schedule from trigger!");
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduler.java b/src/java/azkaban/scheduler/TriggerBasedScheduler.java
index 799c487..28b0328 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduler.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduler.java
@@ -22,7 +22,6 @@ import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadablePeriod;
import org.joda.time.format.DateTimeFormat;
@@ -31,12 +30,8 @@ import org.joda.time.format.DateTimeFormatter;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorManager;
import azkaban.project.ProjectManager;
-import azkaban.trigger.Condition;
-import azkaban.trigger.ConditionChecker;
-import azkaban.trigger.Trigger;
-import azkaban.trigger.TriggerAction;
+import azkaban.sla.SlaOption;
import azkaban.trigger.TriggerManager;
-import azkaban.trigger.builtin.ExecuteFlowAction;
import azkaban.utils.Pair;
/**
@@ -147,7 +142,7 @@ public class TriggerBasedScheduler {
final long submitTime,
final String submitUser
) {
- return scheduleFlow(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null);
+ return scheduleFlow(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null, null);
}
public Schedule scheduleFlow(
@@ -163,9 +158,10 @@ public class TriggerBasedScheduler {
final long nextExecTime,
final long submitTime,
final String submitUser,
- ExecutionOptions execOptions
+ ExecutionOptions execOptions,
+ List<SlaOption> slaOptions
) {
- Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions);
+ Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions, slaOptions);
logger.info("Scheduling flow '" + sched.getScheduleName() + "' for "
+ _dateFormat.print(firstSchedTime) + " with a period of "
+ period == null ? "(non-recurring)" : period);
src/java/azkaban/sla/SlaOption.java 129(+129 -0)
diff --git a/src/java/azkaban/sla/SlaOption.java b/src/java/azkaban/sla/SlaOption.java
new file mode 100644
index 0000000..ad5b536
--- /dev/null
+++ b/src/java/azkaban/sla/SlaOption.java
@@ -0,0 +1,129 @@
+package azkaban.sla;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.ReadablePeriod;
+
+import azkaban.utils.Utils;
+
+public class SlaOption {
+
+ public static final String TYPE_FLOW_FINISH = "FlowFinish";
+ public static final String TYPE_FLOW_SUCCEED = "FlowSucceed";
+ public static final String TYPE_FLOW_PROGRESS = "FlowProgress";
+
+ public static final String TYPE_JOB_FINISH = "JobFinish";
+ public static final String TYPE_JOB_SUCCEED = "JobSucceed";
+ public static final String TYPE_JOB_PROGRESS = "JobProgress";
+
+ public static final String INFO_DURATION = "Duration";
+ public static final String INFO_FLOW_NAME = "FlowName";
+ public static final String INFO_JOB_NAME = "JobName";
+ public static final String INFO_PROGRESS_PERCENT = "ProgressPercent";
+ public static final String INFO_EMAIL_LIST = "EmailList";
+
+ // always alert
+ public static final String ALERT_TYPE = "SlaAlertType";
+ public static final String ACTION_CANCEL_FLOW = "SlaCancelFlow";
+ public static final String ACTION_ALERT = "SlaAlert";
+
+ private String type;
+ private Map<String, Object> info;
+ private List<String> actions;
+
+ public SlaOption(
+ String type,
+ List<String> actions,
+ Map<String, Object> info
+ ) {
+ this.type = type;
+ this.info = info;
+ this.actions = actions;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public Map<String, Object> getInfo() {
+ return info;
+ }
+
+ public void setInfo(Map<String, Object> info) {
+ this.info = info;
+ }
+
+ public List<String> getActions() {
+ return actions;
+ }
+
+ public void setActions(List<String> actions) {
+ this.actions = actions;
+ }
+
+ public Map<String,Object> toObject() {
+ HashMap<String, Object> slaObj = new HashMap<String, Object>();
+
+ slaObj.put("type", type);
+ slaObj.put("info", info);
+ slaObj.put("actions", actions);
+
+ return slaObj;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static SlaOption fromObject(Object object) {
+
+ HashMap<String, Object> slaObj = (HashMap<String,Object>)object;
+
+ String type = (String) slaObj.get("type");
+ List<String> actions = (List<String>) slaObj.get("actions");
+ Map<String, Object> info = (Map<String, Object>) slaObj.get("info");
+
+ return new SlaOption(type, actions, info);
+ }
+
+ public Object toWebObject() {
+ HashMap<String, Object> slaObj = new HashMap<String, Object>();
+
+// slaObj.put("type", type);
+// slaObj.put("info", info);
+// slaObj.put("actions", actions);
+ if(type.equals(TYPE_FLOW_FINISH) || type.equals(TYPE_FLOW_SUCCEED)) {
+ slaObj.put("id", "");
+ } else {
+ slaObj.put("id", info.get(INFO_JOB_NAME));
+ }
+ slaObj.put("duration", info.get(INFO_DURATION));
+ if(type.equals(TYPE_FLOW_FINISH) || type.equals(TYPE_JOB_FINISH)) {
+ slaObj.put("rule", "FINISH");
+ } else {
+ slaObj.put("rule", "SUCCESS");
+ }
+ List<String> actionsObj = new ArrayList<String>();
+ for(String act : actions) {
+ if(act.equals(ACTION_ALERT)) {
+ actionsObj.add("EMAIL");
+ }
+ else {
+ actionsObj.add("KILL");
+ }
+ }
+ slaObj.put("actions", actionsObj);
+
+ return slaObj;
+ }
+
+ @Override
+ public String toString() {
+ return "Sla of " + getType() + getInfo() + getActions();
+ }
+
+}
diff --git a/src/java/azkaban/trigger/ActionTypeLoader.java b/src/java/azkaban/trigger/ActionTypeLoader.java
index 1e4cc3c..4ee2ef3 100644
--- a/src/java/azkaban/trigger/ActionTypeLoader.java
+++ b/src/java/azkaban/trigger/ActionTypeLoader.java
@@ -2,7 +2,6 @@ package azkaban.trigger;
import java.io.File;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
@@ -32,7 +31,7 @@ public class ActionTypeLoader {
// load built-in actions
- loadDefaultActions();
+ loadBuiltinActions();
loadPluginActions(props);
@@ -135,11 +134,18 @@ public class ActionTypeLoader {
logger.info("Loaded action type " + actionName + " " + actionClass);
}
- private void loadDefaultActions() {
+ private void loadBuiltinActions() {
actionToClass.put(ExecuteFlowAction.type, ExecuteFlowAction.class);
logger.info("Loaded ExecuteFlowAction type.");
}
+ public static void registerBuiltinActions(Map<String, Class<? extends TriggerAction>> builtinActions) {
+ actionToClass.putAll(builtinActions);
+ for(String type : builtinActions.keySet()) {
+ logger.info("Loaded " + type + " action.");
+ }
+ }
+
public TriggerAction createActionFromJson(String type, Object obj) throws Exception {
TriggerAction action = null;
Class<? extends TriggerAction> actionClass = actionToClass.get(type);
diff --git a/src/java/azkaban/trigger/builtin/BasicTimeChecker.java b/src/java/azkaban/trigger/builtin/BasicTimeChecker.java
index 05e0722..6071019 100644
--- a/src/java/azkaban/trigger/builtin/BasicTimeChecker.java
+++ b/src/java/azkaban/trigger/builtin/BasicTimeChecker.java
@@ -157,7 +157,12 @@ public class BasicTimeChecker implements ConditionChecker {
boolean skipPastChecks = Boolean.valueOf((String)jsonObj.get("skipPastChecks"));
ReadablePeriod period = Utils.parsePeriodString((String)jsonObj.get("period"));
String id = (String) jsonObj.get("id");
- return new BasicTimeChecker(id, firstCheckTime, timezone, nextCheckTime, isRecurring, skipPastChecks, period);
+
+ BasicTimeChecker checker = new BasicTimeChecker(id, firstCheckTime, timezone, nextCheckTime, isRecurring, skipPastChecks, period);
+ if(skipPastChecks) {
+ checker.updateNextCheckTime();
+ }
+ return checker;
}
@Override
@@ -203,10 +208,14 @@ public class BasicTimeChecker implements ConditionChecker {
// return new BasicTimeChecker(new DateTime(firstMillis, timezone), new DateTime(nextMillis, timezone), isRecurring, skipPastChecks, period);
// }
+ private void updateNextCheckTime(){
+ nextCheckTime = calculateNextCheckTime();
+ }
+
private DateTime calculateNextCheckTime(){
DateTime date = new DateTime(nextCheckTime);
int count = 0;
- while(!DateTime.now().isBefore(date) && skipPastChecks) {
+ while(!DateTime.now().isBefore(date)) {
if(count > 100000) {
throw new IllegalStateException("100000 increments of period did not get to present time.");
}
@@ -217,6 +226,9 @@ public class BasicTimeChecker implements ConditionChecker {
date = date.plus(period);
}
count += 1;
+ if(!skipPastChecks) {
+ continue;
+ }
}
return date;
}
@@ -247,4 +259,10 @@ public class BasicTimeChecker implements ConditionChecker {
return;
}
+ @Override
+ public void setContext(Map<String, Object> context) {
+ // TODO Auto-generated method stub
+
+ }
+
}
diff --git a/src/java/azkaban/trigger/builtin/CreateTriggerAction.java b/src/java/azkaban/trigger/builtin/CreateTriggerAction.java
index e231023..4f8baac 100644
--- a/src/java/azkaban/trigger/builtin/CreateTriggerAction.java
+++ b/src/java/azkaban/trigger/builtin/CreateTriggerAction.java
@@ -12,8 +12,11 @@ public class CreateTriggerAction implements TriggerAction {
public static final String type = "CreateTriggerAction";
private static TriggerRunnerManager triggerRunnerManager;
private Trigger trigger;
-
- public CreateTriggerAction(Trigger trigger) {
+ private Map<String, Object> context;
+ private String actionId;
+
+ public CreateTriggerAction(String actionId, Trigger trigger) {
+ this.actionId = actionId;
this.trigger = trigger;
}
@@ -32,8 +35,9 @@ public class CreateTriggerAction implements TriggerAction {
if(!jsonObj.get("type").equals(type)) {
throw new Exception("Cannot create action of " + type + " from " + jsonObj.get("type"));
}
+ String actionId = (String) jsonObj.get("actionId");
Trigger trigger = Trigger.fromJson(jsonObj.get("trigger"));
- return new CreateTriggerAction(trigger);
+ return new CreateTriggerAction(actionId, trigger);
}
@Override
@@ -45,6 +49,7 @@ public class CreateTriggerAction implements TriggerAction {
@Override
public Object toJson() {
Map<String, Object> jsonObj = new HashMap<String, Object>();
+ jsonObj.put("actionId", actionId);
jsonObj.put("type", type);
jsonObj.put("trigger", trigger.toJson());
@@ -61,4 +66,14 @@ public class CreateTriggerAction implements TriggerAction {
return "create another: " + trigger.getDescription();
}
+ @Override
+ public String getId() {
+ return actionId;
+ }
+
+ @Override
+ public void setContext(Map<String, Object> context) {
+ this.context = context;
+ }
+
}
src/java/azkaban/trigger/builtin/ExecuteFlowAction.java 145(+108 -37)
diff --git a/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java b/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
index a96dfcd..1ff52cb 100644
--- a/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
+++ b/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
@@ -1,6 +1,8 @@
package azkaban.trigger.builtin;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
@@ -12,28 +14,41 @@ import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
+import azkaban.sla.SlaOption;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.Trigger;
import azkaban.trigger.TriggerAction;
+import azkaban.triggerapp.TriggerRunnerManager;
public class ExecuteFlowAction implements TriggerAction {
public static final String type = "ExecuteFlowAction";
+
+ public static final String EXEC_ID = "ExecuteFlowAction.execid";
private static ExecutorManager executorManager;
+ private static TriggerRunnerManager triggerRunnerManager;
+ private String actionId;
private int projectId;
private String projectName;
private String flowName;
private String submitUser;
private static ProjectManager projectManager;
- private ExecutionOptions executionOptions;
-
+ private ExecutionOptions executionOptions = new ExecutionOptions();
+ private List<SlaOption> slaOptions;
+ private Map<String, Object> context;
+
private static Logger logger = Logger.getLogger(ExecuteFlowAction.class);
- public ExecuteFlowAction(int projectId, String projectName, String flowName, String submitUser, ExecutionOptions executionOptions) {
+ public ExecuteFlowAction(String actionId, int projectId, String projectName, String flowName, String submitUser, ExecutionOptions executionOptions, List<SlaOption> slaOptions) {
+ this.actionId = actionId;
this.projectId = projectId;
this.projectName = projectName;
this.flowName = flowName;
this.submitUser = submitUser;
this.executionOptions = executionOptions;
+ this.slaOptions = slaOptions;
}
public static void setLogger(Logger logger) {
@@ -75,6 +90,14 @@ public class ExecuteFlowAction implements TriggerAction {
public void setExecutionOptions(ExecutionOptions executionOptions) {
this.executionOptions = executionOptions;
}
+
+ public List<SlaOption> getSlaOptions() {
+ return slaOptions;
+ }
+
+ public void setSlaOptions(List<SlaOption> slaOptions) {
+ this.slaOptions = slaOptions;
+ }
public static ExecutorManager getExecutorManager() {
return executorManager;
@@ -83,6 +106,14 @@ public class ExecuteFlowAction implements TriggerAction {
public static void setExecutorManager(ExecutorManager executorManager) {
ExecuteFlowAction.executorManager = executorManager;
}
+
+ public static TriggerRunnerManager getTriggerRunnerManager() {
+ return triggerRunnerManager;
+ }
+
+ public static void setTriggerRunnerManager(TriggerRunnerManager triggerRunnerManager) {
+ ExecuteFlowAction.triggerRunnerManager = triggerRunnerManager;
+ }
public static ProjectManager getProjectManager() {
return projectManager;
@@ -100,59 +131,55 @@ public class ExecuteFlowAction implements TriggerAction {
@SuppressWarnings("unchecked")
@Override
public TriggerAction fromJson(Object obj) {
- Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
- String type = (String) jsonObj.get("type");
- if(! type.equals(ExecuteFlowAction.type)) {
- throw new RuntimeException("Cannot create action of " + ExecuteFlowAction.type + " from " + type);
- }
- int projectId = Integer.valueOf((String)jsonObj.get("projectId"));
- String projectName = (String) jsonObj.get("projectName");
- String flowName = (String) jsonObj.get("flowName");
- String submitUser = (String) jsonObj.get("submitUser");
- ExecutionOptions executionOptions = ExecutionOptions.createFromObject(jsonObj.get("executionOptions"));
- return new ExecuteFlowAction(projectId, projectName, flowName, submitUser, executionOptions);
+ return createFromJson((HashMap<String, Object>) obj);
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public static TriggerAction createFromJson(HashMap obj) {
- Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
- String type = (String) jsonObj.get("type");
- if(! type.equals(ExecuteFlowAction.type)) {
- throw new RuntimeException("Cannot create action of " + ExecuteFlowAction.type + " from " + type);
- }
- int projectId = Integer.valueOf((String)jsonObj.get("projectId"));
- String projectName = (String) jsonObj.get("projectName");
- String flowName = (String) jsonObj.get("flowName");
- String submitUser = (String) jsonObj.get("submitUser");
- ExecutionOptions executionOptions = ExecutionOptions.createFromObject(jsonObj.get("executionOptions"));
- return new ExecuteFlowAction(projectId, projectName, flowName, submitUser, executionOptions);
- }
-
@SuppressWarnings("unchecked")
- public static TriggerAction createFromJson(Object obj) {
+ public static TriggerAction createFromJson(HashMap<String, Object> obj) {
Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
- String type = (String) jsonObj.get("type");
- if(! type.equals(ExecuteFlowAction.type)) {
- throw new RuntimeException("Cannot create action of " + ExecuteFlowAction.type + " from " + type);
+ String objType = (String) jsonObj.get("type");
+ if(! objType.equals(type)) {
+ throw new RuntimeException("Cannot create action of " + type + " from " + objType);
}
+ String actionId = (String) jsonObj.get("actionId");
int projectId = Integer.valueOf((String)jsonObj.get("projectId"));
String projectName = (String) jsonObj.get("projectName");
String flowName = (String) jsonObj.get("flowName");
String submitUser = (String) jsonObj.get("submitUser");
- ExecutionOptions executionOptions = ExecutionOptions.createFromObject(jsonObj.get("executionOptions"));
- return new ExecuteFlowAction(projectId, projectName, flowName, submitUser, executionOptions);
+ ExecutionOptions executionOptions = null;
+ if(jsonObj.containsKey("executionOptions")) {
+ executionOptions = ExecutionOptions.createFromObject(jsonObj.get("executionOptions"));
+ }
+ List<SlaOption> slaOptions = null;
+ if(jsonObj.containsKey("slaOptions")) {
+ slaOptions = new ArrayList<SlaOption>();
+ List<Object> slaOptionsObj = (List<Object>) jsonObj.get("slaOptions");
+ for(Object slaObj : slaOptionsObj) {
+ slaOptions.add(SlaOption.fromObject(slaObj));
+ }
+ }
+ return new ExecuteFlowAction(actionId, projectId, projectName, flowName, submitUser, executionOptions, slaOptions);
}
@Override
public Object toJson() {
Map<String, Object> jsonObj = new HashMap<String, Object>();
+ jsonObj.put("actionId", actionId);
jsonObj.put("type", type);
jsonObj.put("projectId", String.valueOf(projectId));
jsonObj.put("projectName", projectName);
jsonObj.put("flowName", flowName);
jsonObj.put("submitUser", submitUser);
- jsonObj.put("executionOptions", executionOptions.toObject());
-
+ if(executionOptions != null) {
+ jsonObj.put("executionOptions", executionOptions.toObject());
+ }
+ if(slaOptions != null) {
+ List<Object> slaOptionsObj = new ArrayList<Object>();
+ for(SlaOption sla : slaOptions) {
+ slaOptionsObj.add(sla.toObject());
+ }
+ jsonObj.put("slaOptions", slaOptionsObj);
+ }
return jsonObj;
}
@@ -188,11 +215,45 @@ public class ExecuteFlowAction implements TriggerAction {
try{
executorManager.submitExecutableFlow(exflow);
+ Map<String, Object> outputProps = new HashMap<String, Object>();
+ outputProps.put(EXEC_ID, exflow.getExecutionId());
+ context.put(actionId, outputProps);
logger.info("Invoked flow " + project.getName() + "." + flowName);
} catch (ExecutorManagerException e) {
throw new RuntimeException(e);
}
+ // deal with sla
+ if(slaOptions != null && slaOptions.size() > 0) {
+ int execId = exflow.getExecutionId();
+ for(SlaOption sla : slaOptions) {
+ logger.info("Adding sla trigger " + sla.toString());
+ SlaChecker slaFailChecker = new SlaChecker("slaFailChecker", sla, execId, false);
+ Map<String, ConditionChecker> failCheckers = new HashMap<String, ConditionChecker>();
+ failCheckers.put(slaFailChecker.getId(), slaFailChecker);
+ Condition triggerCond = new Condition(failCheckers, slaFailChecker.getId() + ".eval()");
+ SlaChecker slaPassChecker = new SlaChecker("slaPassChecker", sla, execId, true);
+ Map<String, ConditionChecker> passCheckers = new HashMap<String, ConditionChecker>();
+ passCheckers.put(slaPassChecker.getId(), slaPassChecker);
+ Condition expireCond = new Condition(passCheckers, slaPassChecker.getId() + ".eval()");
+ List<TriggerAction> actions = new ArrayList<TriggerAction>();
+ List<String> slaActions = sla.getActions();
+ for(String act : slaActions) {
+ if(act.equals(SlaOption.ACTION_ALERT)) {
+ SlaAlertAction slaAlert = new SlaAlertAction("slaAlert", sla, execId);
+ actions.add(slaAlert);
+ } else if(act.equals(SlaOption.ACTION_CANCEL_FLOW)) {
+ KillExecutionAction killAct = new KillExecutionAction("killExecution", execId);
+ actions.add(killAct);
+ }
+ }
+ Trigger slaTrigger = new Trigger("azkaban_sla", "azkaban", triggerCond, expireCond, actions);
+ slaTrigger.setResetOnTrigger(false);
+ slaTrigger.setResetOnExpire(false);
+ triggerRunnerManager.insertTrigger(slaTrigger);
+ }
+ }
+
}
@Override
@@ -201,5 +262,15 @@ public class ExecuteFlowAction implements TriggerAction {
" from project " + getProjectName();
}
+ @Override
+ public void setContext(Map<String, Object> context) {
+ this.context = context;
+ }
+
+ @Override
+ public String getId() {
+ return actionId;
+ }
+
}
diff --git a/src/java/azkaban/trigger/builtin/KillExecutionAction.java b/src/java/azkaban/trigger/builtin/KillExecutionAction.java
new file mode 100644
index 0000000..1da405d
--- /dev/null
+++ b/src/java/azkaban/trigger/builtin/KillExecutionAction.java
@@ -0,0 +1,93 @@
+package azkaban.trigger.builtin;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManager;
+import azkaban.trigger.TriggerAction;
+
+public class KillExecutionAction implements TriggerAction{
+
+ public static final String type = "KillExecutionAction";
+
+ private static final Logger logger = Logger.getLogger(KillExecutionAction.class);
+
+ private String actionId;
+ private int execId;
+ private static ExecutorManager executorManager;
+
+ public KillExecutionAction(String actionId, int execId) {
+ this.execId = execId;
+ this.actionId = actionId;
+ }
+
+ public static void setExecutorManager(ExecutorManager em) {
+ executorManager = em;
+ }
+
+ @Override
+ public String getId() {
+ return actionId;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static KillExecutionAction createFromJson(Object obj) {
+ return createFromJson((HashMap<String, Object>)obj);
+ }
+
+ public static KillExecutionAction createFromJson(HashMap<String, Object> obj) {
+ Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+ String objType = (String) jsonObj.get("type");
+ if(! objType.equals(type)) {
+ throw new RuntimeException("Cannot create action of " + type + " from " + objType);
+ }
+ String actionId = (String) jsonObj.get("actionId");
+ int execId = Integer.valueOf((String) jsonObj.get("execId"));
+ return new KillExecutionAction(actionId, execId);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public KillExecutionAction fromJson(Object obj) throws Exception {
+ return createFromJson((HashMap<String, Object>)obj);
+ }
+
+ @Override
+ public Object toJson() {
+ Map<String, Object> jsonObj = new HashMap<String, Object>();
+ jsonObj.put("actionId", actionId);
+ jsonObj.put("type", type);
+ jsonObj.put("execId", String.valueOf(execId));
+ return jsonObj;
+ }
+
+ @Override
+ public void doAction() throws Exception {
+ ExecutableFlow exFlow = executorManager.getExecutableFlow(execId);
+ logger.info("ready to kill execution " + execId);
+ if(!executorManager.isFinished(exFlow)) {
+ logger.info("Killing execution " + execId);
+ executorManager.cancelFlow(exFlow, "azkaban_sla");
+ }
+ }
+
+ @Override
+ public void setContext(Map<String, Object> context) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public String getDescription() {
+ return type + " for " + execId;
+ }
+
+}
diff --git a/src/java/azkaban/trigger/builtin/SendEmailAction.java b/src/java/azkaban/trigger/builtin/SendEmailAction.java
new file mode 100644
index 0000000..ec946c2
--- /dev/null
+++ b/src/java/azkaban/trigger/builtin/SendEmailAction.java
@@ -0,0 +1,99 @@
+package azkaban.trigger.builtin;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import azkaban.sla.SlaOption;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.utils.AbstractMailer;
+import azkaban.utils.EmailMessage;
+import azkaban.utils.Props;
+
+public class SendEmailAction implements TriggerAction {
+
+ private static final Logger logger = Logger.getLogger(SendEmailAction.class);
+
+ private String actionId;
+ private Map<String, Object> context;
+ private static AbstractMailer mailer;
+ private String message;
+ public static final String type = "SendEmailAction";
+ private String mimetype = "text/html";
+ private List<String> emailList;
+ private String subject;
+
+ public static void init(Props props) {
+ mailer = new AbstractMailer(props);
+ }
+
+ public SendEmailAction(String actionId, String subject, String message, List<String> emailList) {
+ this.actionId = actionId;
+ this.message = message;
+ this.subject = subject;
+ this.emailList = emailList;
+ }
+
+ @Override
+ public String getId() {
+ return actionId;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static SendEmailAction createFromJson(Object obj) throws Exception {
+ Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+ if(!jsonObj.get("type").equals(type)) {
+ throw new Exception("Cannot create action of " + type + " from " + jsonObj.get("type"));
+ }
+ String actionId = (String) jsonObj.get("actionId");
+ String subject = (String) jsonObj.get("subject");
+ String message = (String) jsonObj.get("message");
+ List<String> emailList = (List<String>) jsonObj.get("emailList");
+ return new SendEmailAction(actionId, subject, message, emailList);
+ }
+
+ @Override
+ public TriggerAction fromJson(Object obj) throws Exception {
+ return createFromJson(obj);
+ }
+
+ @Override
+ public Object toJson() {
+ Map<String, Object> jsonObj = new HashMap<String, Object>();
+ jsonObj.put("actionId", actionId);
+ jsonObj.put("type", type);
+ jsonObj.put("subject", subject);
+ jsonObj.put("message", message);
+ jsonObj.put("emailList", emailList);
+
+ return jsonObj;
+ }
+
+ @Override
+ public void doAction() throws Exception {
+ EmailMessage email = mailer.prepareEmailMessage(subject, mimetype, emailList);
+ email.setBody(message);
+ email.sendEmail();
+ }
+
+ @Override
+ public void setContext(Map<String, Object> context) {
+
+ }
+
+ @Override
+ public String getDescription() {
+ return type;
+ }
+
+
+}
src/java/azkaban/trigger/builtin/SlaAlertAction.java 170(+170 -0)
diff --git a/src/java/azkaban/trigger/builtin/SlaAlertAction.java b/src/java/azkaban/trigger/builtin/SlaAlertAction.java
new file mode 100644
index 0000000..ead2124
--- /dev/null
+++ b/src/java/azkaban/trigger/builtin/SlaAlertAction.java
@@ -0,0 +1,170 @@
+package azkaban.trigger.builtin;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorMailer;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManager.Alerter;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.sla.SlaOption;
+import azkaban.trigger.TriggerAction;
+import azkaban.utils.FileIOUtils;
+import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
+
+public class SlaAlertAction implements TriggerAction{
+
+ public static final String type = "AlertAction";
+
+ private static final Logger logger = Logger.getLogger(SlaAlertAction.class);
+
+ private String actionId;
+ private SlaOption slaOption;
+ private int execId;
+// private List<Map<String, Object>> alerts;
+ private static Map<String, Alerter> alerters;
+ private Map<String, Object> context;
+ private static ExecutorManager executorManager;
+
+ public SlaAlertAction(String id, SlaOption slaOption, int execId) {
+ this.actionId = id;
+ this.slaOption = slaOption;
+ this.execId = execId;
+// this.alerts = alerts;
+ }
+
+ public static void setAlerters(Map<String, Alerter> alts) {
+ alerters = alts;
+ }
+
+ public static void setExecutorManager(ExecutorManager em) {
+ executorManager = em;
+ }
+
+ @Override
+ public String getId() {
+ return actionId;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static SlaAlertAction createFromJson(Object obj) throws Exception {
+ return createFromJson((HashMap<String, Object>) obj);
+ }
+
+ public static SlaAlertAction createFromJson(HashMap<String, Object> obj) throws Exception {
+ Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+ if(!jsonObj.get("type").equals(type)) {
+ throw new Exception("Cannot create action of " + type + " from " + jsonObj.get("type"));
+ }
+ String actionId = (String) jsonObj.get("actionId");
+ SlaOption slaOption = SlaOption.fromObject(jsonObj.get("slaOption"));
+ int execId = Integer.valueOf((String) jsonObj.get("execId"));
+// List<Map<String, Object>> alerts = (List<Map<String, Object>>) jsonObj.get("alerts");
+ return new SlaAlertAction(actionId, slaOption, execId);
+ }
+
+ @Override
+ public TriggerAction fromJson(Object obj) throws Exception {
+ return createFromJson(obj);
+ }
+
+ @Override
+ public Object toJson() {
+ Map<String, Object> jsonObj = new HashMap<String, Object>();
+ jsonObj.put("actionId", actionId);
+ jsonObj.put("type", type);
+ jsonObj.put("slaOption", slaOption.toObject());
+ jsonObj.put("execId", String.valueOf(execId));
+// jsonObj.put("alerts", alerts);
+
+ return jsonObj;
+ }
+
+ @Override
+ public void doAction() throws Exception {
+// for(Map<String, Object> alert : alerts) {
+ logger.info("Alerting on sla failure.");
+ Map<String, Object> alert = slaOption.getInfo();
+ if(alert.containsKey(SlaOption.ALERT_TYPE)) {
+ String alertType = (String) alert.get(SlaOption.ALERT_TYPE);
+ Alerter alerter = alerters.get(alertType);
+ if(alerter != null) {
+ try {
+ alerter.alertOnSla(slaOption, createSlaMessage());
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ logger.error("Failed to alert by " + alertType);
+ }
+ }
+ else {
+ logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
+ }
+ }
+// }
+ }
+
+ private String createSlaMessage() {
+ ExecutableFlow flow = null;
+ try {
+ flow = executorManager.getExecutableFlow(execId);
+ } catch (ExecutorManagerException e) {
+ e.printStackTrace();
+ logger.error("Failed to get executable flow.");
+ }
+ String type = slaOption.getType();
+ if(type.equals(SlaOption.TYPE_FLOW_FINISH)) {
+ String flowName = (String) slaOption.getInfo().get(SlaOption.INFO_FLOW_NAME);
+ String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+ String basicinfo = "SLA Alert: Your flow " + flowName + " failed to FINISH within " + duration + "</br>";
+ String expected = "Here is details : </br>" + "Flow " + flowName + " in execution " + execId + " is expected to FINISH within " + duration + " from " + flow.getStartTime() + "</br>";
+ String actual = "Actual flow status is " + flow.getStatus();
+ return basicinfo + expected + actual;
+ } else if(type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
+ String flowName = (String) slaOption.getInfo().get(SlaOption.INFO_FLOW_NAME);
+ String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+ String basicinfo = "SLA Alert: Your flow " + flowName + " failed to SUCCEED within " + duration + "</br>";
+ String expected = "Here is details : </br>" + "Flow " + flowName + " in execution " + execId + " expected to FINISH within " + duration + " from " + flow.getStartTime() + "</br>";
+ String actual = "Actual flow status is " + flow.getStatus();
+ return basicinfo + expected + actual;
+ } else if(type.equals(SlaOption.TYPE_JOB_FINISH)) {
+ String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+ String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+ return "SLA Alert: Your job " + jobName + " failed to FINISH within " + duration + " in execution " + execId;
+ } else if(type.equals(SlaOption.TYPE_JOB_SUCCEED)) {
+ String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+ String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+ return "SLA Alert: Your job " + jobName + " failed to SUCCEED within " + duration + " in execution " + execId;
+ } else {
+ return "Unrecognized SLA type " + type;
+ }
+ }
+
+ @Override
+ public void setContext(Map<String, Object> context) {
+ this.context = context;
+ }
+
+ @Override
+ public String getDescription() {
+ return type + " with " + slaOption.toString();
+ }
+
+}
src/java/azkaban/trigger/builtin/SlaChecker.java 229(+229 -0)
diff --git a/src/java/azkaban/trigger/builtin/SlaChecker.java b/src/java/azkaban/trigger/builtin/SlaChecker.java
new file mode 100644
index 0000000..e9826eb
--- /dev/null
+++ b/src/java/azkaban/trigger/builtin/SlaChecker.java
@@ -0,0 +1,229 @@
+package azkaban.trigger.builtin;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.ReadablePeriod;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
+import azkaban.sla.SlaOption;
+import azkaban.trigger.ConditionChecker;
+import azkaban.utils.Utils;
+
+public class SlaChecker implements ConditionChecker{
+
+ private static final Logger logger = Logger.getLogger(SlaChecker.class);
+ public static final String type = "SlaChecker";
+
+ private String id;
+ private SlaOption slaOption;
+ private int execId;
+ private Map<String, Object> context;
+ private boolean passChecker = true;
+
+ private static ExecutorManager executorManager;
+
+ public SlaChecker(String id, SlaOption slaOption, int execId, boolean passChecker) {
+ this.id = id;
+ this.slaOption = slaOption;
+ this.execId = execId;
+ this.passChecker = passChecker;
+ }
+
+ public SlaChecker(String id, SlaOption sla, String executionActionId, boolean passChecker) {
+ Map<String, Object> executeActionProps = (Map<String, Object>) context.get(executionActionId);
+ int execId = Integer.valueOf((String) executeActionProps.get(ExecuteFlowAction.EXEC_ID));
+ this.id = id;
+ this.slaOption = sla;
+ this.execId = execId;
+ this.passChecker = passChecker;
+ }
+
+ public static void setExecutorManager(ExecutorManager em) {
+ executorManager = em;
+ }
+
+ private Boolean metSla(ExecutableFlow flow) {
+ String type = slaOption.getType();
+ logger.info("Checking for " + flow.getExecutionId() + " with sla " + type);
+ logger.info("flow is " + flow.getStatus());
+ if(flow.getStartTime() < 0) {
+ return null;
+ }
+ if(type.equals(SlaOption.TYPE_FLOW_FINISH)) {
+ ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+ DateTime startTime = new DateTime(flow.getStartTime());
+ DateTime checkTime = startTime.plus(duration);
+ if(checkTime.isBeforeNow()) {
+ Status status = flow.getStatus();
+ if(status.equals(Status.FAILED) || status.equals(Status.KILLED) || status.equals(Status.SUCCEEDED)) {
+ return Boolean.TRUE;
+ } else {
+ return Boolean.FALSE;
+ }
+ }
+ } else if(type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
+ ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+ DateTime startTime = new DateTime(flow.getStartTime());
+ DateTime checkTime = startTime.plus(duration);
+ if(checkTime.isBeforeNow()) {
+ Status status = flow.getStatus();
+ if(status.equals(Status.SUCCEEDED)) {
+ return Boolean.TRUE;
+ } else {
+ return Boolean.FALSE;
+ }
+ }
+ } else if(type.equals(SlaOption.TYPE_JOB_FINISH)) {
+ String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+ ExecutableNode node = flow.getExecutableNode(jobName);
+ if(node.getStartTime() > 0) {
+ ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+ DateTime startTime = new DateTime(node.getStartTime());
+ DateTime checkTime = startTime.plus(duration);
+ if(checkTime.isBeforeNow()) {
+ Status status = node.getStatus();
+ if(status.equals(Status.FAILED) || status.equals(Status.KILLED) || status.equals(Status.SUCCEEDED)) {
+ return Boolean.TRUE;
+ } else {
+ return Boolean.FALSE;
+ }
+ }
+ }
+ } else if(type.equals(SlaOption.TYPE_JOB_SUCCEED)) {
+ String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+ ExecutableNode node = flow.getExecutableNode(jobName);
+ if(node.getStartTime() > 0) {
+ ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+ DateTime startTime = new DateTime(node.getStartTime());
+ DateTime checkTime = startTime.plus(duration);
+ if(checkTime.isBeforeNow()) {
+ Status status = node.getStatus();
+ if(status.equals(Status.SUCCEEDED)) {
+ return Boolean.TRUE;
+ } else {
+ return Boolean.FALSE;
+ }
+ }
+ }
+ }
+// else if(type.equals(SlaOption.TYPE_JOB_PROGRESS)) {
+// String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+// float targetProgress = Float.valueOf((String) slaOption.getInfo().get(SlaOption.INFO_PROGRESS_PERCENT));
+// ExecutableNode node = flow.getExecutableNode(jobName);
+// if(node.getStartTime() > 0) {
+// ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+// DateTime startTime = new DateTime(node.getStartTime());
+// DateTime checkTime = startTime.plus(duration);
+// if(checkTime.isBeforeNow()) {
+// if(node.getProgress() > targetProgress) {
+// return Boolean.FALSE;
+// } else {
+// return Boolean.TRUE;
+// }
+// }
+// } else {
+// return Boolean.FALSE;
+// }
+// }
+ return null;
+ }
+
+ // return true for should do sla actions
+ @Override
+ public Object eval() {
+ ExecutableFlow flow;
+ try {
+ flow = executorManager.getExecutableFlow(execId);
+ } catch (ExecutorManagerException e) {
+ logger.error("Can't get executable flow.", e);
+ e.printStackTrace();
+ return Boolean.TRUE;
+ }
+ Boolean metSla = metSla(flow);
+ if(metSla == null) {
+ return Boolean.FALSE;
+ } else {
+ if(passChecker) {
+ return metSla;
+ } else {
+ return !metSla;
+ }
+ }
+ }
+
+ @Override
+ public Object getNum() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @Override
+ public ConditionChecker fromJson(Object obj) throws Exception {
+ return createFromJson(obj);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static SlaChecker createFromJson(Object obj) throws Exception {
+ return createFromJson((HashMap<String, Object>)obj);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static SlaChecker createFromJson(HashMap<String, Object> obj) throws Exception {
+ Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+ if(!jsonObj.get("type").equals(type)) {
+ throw new Exception("Cannot create checker of " + type + " from " + jsonObj.get("type"));
+ }
+ String id = (String) jsonObj.get("id");
+ SlaOption slaOption = SlaOption.fromObject(jsonObj.get("slaOption"));
+ int execId = Integer.valueOf((String) jsonObj.get("execId"));
+ boolean passChecker = Boolean.valueOf((Boolean) jsonObj.get("passChecker"));
+ return new SlaChecker(id, slaOption, execId, passChecker);
+ }
+
+ @Override
+ public Object toJson() {
+ Map<String, Object> jsonObj = new HashMap<String, Object>();
+ jsonObj.put("type", type);
+ jsonObj.put("id", id);
+ jsonObj.put("slaOption", slaOption.toObject());
+ jsonObj.put("execId", String.valueOf(execId));
+ jsonObj.put("passChecker", passChecker);
+
+ return jsonObj;
+ }
+
+ @Override
+ public void stopChecker() {
+
+ }
+
+ @Override
+ public void setContext(Map<String, Object> context) {
+ this.context = context;
+ }
+
+}
src/java/azkaban/trigger/builtin/WatchSlaAction.java 206(+206 -0)
diff --git a/src/java/azkaban/trigger/builtin/WatchSlaAction.java b/src/java/azkaban/trigger/builtin/WatchSlaAction.java
new file mode 100644
index 0000000..d9a549b
--- /dev/null
+++ b/src/java/azkaban/trigger/builtin/WatchSlaAction.java
@@ -0,0 +1,206 @@
+package azkaban.trigger.builtin;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.ReadablePeriod;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.Status;
+import azkaban.flow.CommonJobProperties;
+import azkaban.sla.SlaOption;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.triggerapp.TriggerRunnerManager;
+import azkaban.utils.Utils;
+
+public class WatchSlaAction implements TriggerAction{
+
+ private static final Logger logger = Logger.getLogger(WatchSlaAction.class);
+ public static final String type = "SetSlaWatcherAction";
+
+ private Map<String, Object> context;
+ private List<SlaOption> slaOptions;
+ private String actionId;
+ private String executionActionId;
+ private static ExecutorManager executorManager;
+ private static TriggerRunnerManager trigggerRunnerManager;
+
+ public WatchSlaAction(String actionId, List<SlaOption> slaOptions, String executionActionId) {
+ this.actionId = actionId;
+ this.slaOptions = slaOptions;
+ this.executionActionId = executionActionId;
+ }
+
+ public static void setExecutorManager(ExecutorManager em) {
+ executorManager = em;
+ }
+
+ public static void setTriggerRunnerManager(TriggerRunnerManager trm) {
+ trigggerRunnerManager = trm;
+ }
+
+ @Override
+ public String getId() {
+ return actionId;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static WatchSlaAction createFromJson(Object obj) {
+ Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+ String objType = (String) jsonObj.get("type");
+ if(! objType.equals(type)) {
+ throw new RuntimeException("Cannot create action of " + type + " from " + objType);
+ }
+ String actionId = (String) jsonObj.get("actionId");
+ String executionActionId = (String) jsonObj.get("executionActionId");
+ List<SlaOption> slaOptions = new ArrayList<SlaOption>();
+ List<Object> slaOptionsObj = (List<Object>) jsonObj.get("slaOptions");
+ for(Object slaObj : slaOptionsObj) {
+ slaOptions.add(SlaOption.fromObject(slaObj));
+ }
+ return new WatchSlaAction(actionId, slaOptions, executionActionId);
+
+ }
+
+ @Override
+ public TriggerAction fromJson(Object obj) throws Exception {
+ return createFromJson(obj);
+ }
+
+ @Override
+ public Object toJson() {
+ Map<String, Object> jsonObj = new HashMap<String, Object>();
+ jsonObj.put("actionId", actionId);
+ jsonObj.put("type", type);
+ jsonObj.put("executionActionId", executionActionId);
+ List<Object> slaOptionsObj = new ArrayList<Object>();
+ for(SlaOption sla : slaOptions) {
+ slaOptionsObj.add(sla.toObject());
+ }
+ jsonObj.put("slaOptions", slaOptionsObj);
+
+ return jsonObj;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void doAction() throws Exception {
+
+ if(executorManager == null) {
+ throw new Exception("ExecutorManager not initialized. Failed to set sla.");
+ }
+ if(trigggerRunnerManager == null) {
+ throw new Exception("TriggerRunnerManager not initialized. Failed to set sla.");
+ }
+
+ if(!context.containsKey(executionActionId)) {
+ throw new Exception("No trace of the execution. Cannot set sla if the flow/job is not run.");
+ }
+ Map<String, Object> executionInfo = (Map<String, Object>) context.get(executionActionId);
+ if(!executionInfo.containsKey(ExecuteFlowAction.EXEC_ID)) {
+ throw new Exception("Execution Id not set. Failed to set sla.");
+ }
+ int execId = Integer.valueOf((String) executionInfo.get(ExecuteFlowAction.EXEC_ID));
+ ExecutableFlow exflow = executorManager.getExecutableFlow(execId);
+
+ for(SlaOption sla : slaOptions) {
+ if(sla.getType().equals(SlaOption.TYPE_FLOW_FINISH)) {
+ // just do a time checker and see if the flow finish
+ ConditionChecker timer = createFlowSlaTimer(exflow, sla);
+ ExecutionChecker statusChecker = new ExecutionChecker("slaStatusChecker1", execId, ExecutionChecker.TARGET_FINISHED, null);
+ String failExpr = timer.getId() + ".eval() && !" + statusChecker.getId() + ".eval()";
+ Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+ checkers.put(timer.getId(), timer);
+ checkers.put(statusChecker.getId(), statusChecker);
+ Condition triggerCondition = new Condition(checkers, failExpr);
+ List<String> slaActions = sla.getActions();
+ List<TriggerAction> actions = new ArrayList<TriggerAction>();
+ // always send email
+ SendEmailAction emailAct = new SendEmailAction("sendSlaAlertEmail", getSlaEmailSubject(exflow), getSlaEmailMessage(exflow, sla), (List<String>) sla.getInfo().get("emailList"));
+ actions.add(emailAct);
+ for(String act : slaActions) {
+ if(act.equals(SlaOption.ACTION_CANCEL_FLOW)) {
+ KillExecutionAction killAct = new KillExecutionAction("slaKiller1", exflow.getExecutionId());
+ actions.add(killAct);
+ }
+// else if(act.equals(SlaOption.ACTION_ALERT_BY_EMAIL)) {
+// SendEmailAction emailAct = new SendEmailAction("sendSlaAlertEmail", getSlaEmailSubject(exflow), getSlaEmailMessage(exflow, sla), (List<String>) sla.getInfo().get("emailList"));
+// actions.add(emailAct);
+// }
+ }
+
+ Trigger t = new Trigger("azkaban", "triggerserver", triggerCondition, triggerCondition, actions);
+ t.setResetOnTrigger(false);
+ t.setResetOnExpire(false);
+ trigggerRunnerManager.insertTrigger(t);
+ } else if(sla.getType().equals(SlaOption.TYPE_FLOW_SUCCEED)) {
+// String jobName = (String) sla.getInfo().get("jobName");
+// ExecutableNode exNode = exflow.getExecutableNode(jobName);
+// ConditionChecker timer = createJobSlaTimer(exNode, sla);
+// if(exNode.getStatus().equals(Status.RUNNING)) {
+//
+// }
+ } else {
+ logger.error("Unknown sla type.");
+ }
+ }
+ }
+
+ private BasicTimeChecker createFlowSlaTimer(ExecutableFlow exflow, SlaOption sla) {
+ Map<String, Object> info = sla.getInfo();
+ ReadablePeriod duration = Utils.parsePeriodString((String) info.get(SlaOption.INFO_DURATION));
+ DateTime startTime = new DateTime(exflow.getSubmitTime());
+ DateTime checkTime = startTime.plus(duration);
+ BasicTimeChecker timeChecker = new BasicTimeChecker("slaTimer1", checkTime, checkTime.getZone(), false, false, null);
+ return timeChecker;
+ }
+
+// private BasicTimeChecker createJobSlaTimer(ExecutableNode exNode, SlaOption sla) {
+// Map<String, Object> info = sla.getInfo();
+// ReadablePeriod runtimelimit = Utils.parsePeriodString((String) info.get(SlaOption.INFO_RUNTIMELIMIT));
+// DateTime startTime = new DateTime(exNode.getStartTime());
+// DateTime checkTime = startTime.plus(runtimelimit);
+// BasicTimeChecker timeChecker = new BasicTimeChecker("slaTimer1", checkTime, checkTime.getZone(), false, false, null);
+// return timeChecker;
+// }
+
+ private String getSlaEmailSubject(ExecutableFlow flow) {
+ return "A preset SLA on flow " + flow.getFlowId() + " is not met in execution " + flow.getExecutionId();
+ }
+
+ private String getSlaEmailMessage(ExecutableFlow flow, SlaOption slaOption) {
+ if(slaOption.getType().equals(SlaOption.TYPE_FLOW_FINISH)) {
+ return "<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has exceeded its expected finish time'" + "</h2>";
+ }
+ if(slaOption.getType().equals(SlaOption.TYPE_FLOW_SUCCEED)) {
+ return "<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has exceeded its expected time to succeed'" + "</h2>";
+ }
+ return "Unrecognized SLA type.";
+ }
+
+ @Override
+ public void setContext(Map<String, Object> context) {
+ this.context = context;
+ }
+
+ @Override
+ public String getDescription() {
+ return type;
+ }
+
+}
diff --git a/src/java/azkaban/trigger/CheckerTypeLoader.java b/src/java/azkaban/trigger/CheckerTypeLoader.java
index 74f9ddf..45fc8ff 100644
--- a/src/java/azkaban/trigger/CheckerTypeLoader.java
+++ b/src/java/azkaban/trigger/CheckerTypeLoader.java
@@ -2,7 +2,6 @@ package azkaban.trigger;
import java.io.File;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
@@ -10,7 +9,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.log4j.Logger;
@@ -34,7 +32,7 @@ public class CheckerTypeLoader {
// load built-in checkers
- loadDefaultCheckers();
+ loadBuiltinCheckers();
loadPluginCheckers(props);
@@ -138,7 +136,14 @@ public class CheckerTypeLoader {
logger.info("Loaded checker type " + checkerName + " " + checkerClass);
}
- private void loadDefaultCheckers() {
+ public static void registerBuiltinCheckers(Map<String, Class<? extends ConditionChecker>> builtinCheckers) {
+ checkerToClass.putAll(checkerToClass);
+ for(String type : builtinCheckers.keySet()) {
+ logger.info("Loaded " + type + " checker.");
+ }
+ }
+
+ private void loadBuiltinCheckers() {
checkerToClass.put("BasicTimeChecker", BasicTimeChecker.class);
logger.info("Loaded BasicTimeChecker type.");
}
diff --git a/src/java/azkaban/trigger/Condition.java b/src/java/azkaban/trigger/Condition.java
index fcc1fa0..451285b 100644
--- a/src/java/azkaban/trigger/Condition.java
+++ b/src/java/azkaban/trigger/Condition.java
@@ -106,9 +106,9 @@ public class Condition {
ConditionChecker ck = checkerLoader.createCheckerFromJson(type, oneChecker.get("checkerJson"));
checkers.put(ck.getId(), ck);
}
- String expr = (String) jsonObj.get("expression");
+ String expr = (String) jsonObj.get("expression");
- cond = new Condition(checkers, expr);
+ cond = new Condition(checkers, expr);
} catch(Exception e) {
e.printStackTrace();
diff --git a/src/java/azkaban/trigger/ConditionChecker.java b/src/java/azkaban/trigger/ConditionChecker.java
index 920eddf..85b4003 100644
--- a/src/java/azkaban/trigger/ConditionChecker.java
+++ b/src/java/azkaban/trigger/ConditionChecker.java
@@ -1,5 +1,7 @@
package azkaban.trigger;
+import java.util.Map;
+
public interface ConditionChecker {
@@ -19,4 +21,6 @@ public interface ConditionChecker {
void stopChecker();
+ void setContext(Map<String, Object> context);
+
}
diff --git a/src/java/azkaban/trigger/JdbcTriggerLoader.java b/src/java/azkaban/trigger/JdbcTriggerLoader.java
index 37f8131..658594b 100644
--- a/src/java/azkaban/trigger/JdbcTriggerLoader.java
+++ b/src/java/azkaban/trigger/JdbcTriggerLoader.java
@@ -7,7 +7,6 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
@@ -142,6 +141,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
@Override
public void addTrigger(Trigger t) throws TriggerManagerException {
logger.info("Inserting trigger " + t.toString() + " into db.");
+ t.setLastModifyTime(DateTime.now());
Connection connection = getConnection();
try {
addTrigger(connection, t, defaultEncodingType);
@@ -181,6 +181,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
@Override
public void updateTrigger(Trigger t) throws TriggerManagerException {
logger.info("Updating trigger " + t.toString() + " into db.");
+ t.setLastModifyTime(DateTime.now());
Connection connection = getConnection();
try{
t.setLastModifyTime(DateTime.now());
@@ -224,6 +225,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
t.getTriggerId());
if (updates == 0) {
throw new TriggerManagerException("No trigger has been updated.");
+ //logger.error("No trigger is updated!");
}
} catch (SQLException e) {
logger.error(UPDATE_TRIGGER + " failed.");
@@ -258,8 +260,8 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
ArrayList<Trigger> triggers = new ArrayList<Trigger>();
do {
int triggerId = rs.getInt(1);
- String triggerSource = rs.getString(2);
- long modifyTime = rs.getLong(3);
+// String triggerSource = rs.getString(2);
+// long modifyTime = rs.getLong(3);
int encodingType = rs.getInt(4);
byte[] data = rs.getBytes(5);
@@ -331,8 +333,8 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
}
if(triggers.size() == 0) {
- logger.error("Failed to load trigger " + triggerId);
- throw new TriggerManagerException("Failed to load trigger " + triggerId);
+ logger.error("Loaded 0 triggers. Failed to load trigger " + triggerId);
+ throw new TriggerManagerException("Loaded 0 triggers. Failed to load trigger " + triggerId);
}
return triggers.get(0);
src/java/azkaban/trigger/Trigger.java 87(+84 -3)
diff --git a/src/java/azkaban/trigger/Trigger.java b/src/java/azkaban/trigger/Trigger.java
index 962cf7a..d668b5e 100644
--- a/src/java/azkaban/trigger/Trigger.java
+++ b/src/java/azkaban/trigger/Trigger.java
@@ -25,6 +25,7 @@ public class Trigger {
private List<TriggerAction> expireActions;
private Map<String, Object> info = new HashMap<String, Object>();
+ private Map<String, Object> context = new HashMap<String, Object>();
private static ActionTypeLoader actionTypeLoader;
@@ -75,7 +76,15 @@ public class Trigger {
public void setInfo(Map<String, Object> info) {
this.info = info;
}
+
+ public Map<String, Object> getContext() {
+ return context;
+ }
+ public void setContext(Map<String, Object> context) {
+ this.context = context;
+ }
+
public Trigger(
DateTime lastModifyTime,
DateTime submitTime,
@@ -85,7 +94,8 @@ public class Trigger {
Condition expireCondition,
List<TriggerAction> actions,
List<TriggerAction> expireActions,
- Map<String, Object> info) {
+ Map<String, Object> info,
+ Map<String, Object> context) {
this.lastModifyTime = lastModifyTime;
this.submitTime = submitTime;
this.submitUser = submitUser;
@@ -95,6 +105,7 @@ public class Trigger {
this.actions = actions;
this.expireActions = expireActions;
this.info = info;
+ this.context = context;
}
public Trigger(
@@ -117,6 +128,39 @@ public class Trigger {
}
public Trigger(
+ String submitUser,
+ String source,
+ Condition triggerCondition,
+ Condition expireCondition,
+ List<TriggerAction> actions,
+ List<TriggerAction> expireActions) {
+ this.lastModifyTime = DateTime.now();
+ this.submitTime = DateTime.now();
+ this.submitUser = submitUser;
+ this.source = source;
+ this.triggerCondition = triggerCondition;
+ this.expireCondition = expireCondition;
+ this.actions = actions;
+ this.expireActions = expireActions;
+ }
+
+ public Trigger(
+ String submitUser,
+ String source,
+ Condition triggerCondition,
+ Condition expireCondition,
+ List<TriggerAction> actions) {
+ this.lastModifyTime = DateTime.now();
+ this.submitTime = DateTime.now();
+ this.submitUser = submitUser;
+ this.source = source;
+ this.triggerCondition = triggerCondition;
+ this.expireCondition = expireCondition;
+ this.actions = actions;
+ this.expireActions = new ArrayList<TriggerAction>();
+ }
+
+ public Trigger(
DateTime lastModifyTime,
DateTime submitTime,
String submitUser,
@@ -144,7 +188,8 @@ public class Trigger {
Condition expireCondition,
List<TriggerAction> actions,
List<TriggerAction> expireActions,
- Map<String, Object> info) {
+ Map<String, Object> info,
+ Map<String, Object> context) {
this.triggerId = triggerId;
this.lastModifyTime = lastModifyTime;
this.submitTime = submitTime;
@@ -155,6 +200,7 @@ public class Trigger {
this.actions = actions;
this.expireActions = expireActions;
this.info = info;
+ this.context = context;
}
public Trigger(
@@ -178,6 +224,26 @@ public class Trigger {
this.expireActions = expireActions;
}
+ public Trigger(
+ int triggerId,
+ DateTime lastModifyTime,
+ DateTime submitTime,
+ String submitUser,
+ String source,
+ Condition triggerCondition,
+ Condition expireCondition,
+ List<TriggerAction> actions) {
+ this.triggerId = triggerId;
+ this.lastModifyTime = lastModifyTime;
+ this.submitTime = submitTime;
+ this.submitUser = submitUser;
+ this.source = source;
+ this.triggerCondition = triggerCondition;
+ this.expireCondition = expireCondition;
+ this.actions = actions;
+ this.expireActions = new ArrayList<TriggerAction>();
+ }
+
public static synchronized void setActionTypeLoader(ActionTypeLoader loader) {
Trigger.actionTypeLoader = loader;
}
@@ -268,6 +334,7 @@ public class Trigger {
jsonObj.put("triggerId", String.valueOf(triggerId));
jsonObj.put("status", status.toString());
jsonObj.put("info", info);
+ jsonObj.put("context", context);
return jsonObj;
}
@@ -316,7 +383,21 @@ public class Trigger {
int triggerId = Integer.valueOf((String) jsonObj.get("triggerId"));
TriggerStatus status = TriggerStatus.valueOf((String)jsonObj.get("status"));
Map<String, Object> info = (Map<String, Object>) jsonObj.get("info");
- trigger = new Trigger(triggerId, lastModifyTime, submitTime, submitUser, source, triggerCond, expireCond, actions, expireActions, info);
+ Map<String, Object> context = (Map<String, Object>) jsonObj.get("context");
+ for(ConditionChecker checker : triggerCond.getCheckers().values()) {
+ checker.setContext(context);
+ }
+ for(ConditionChecker checker : expireCond.getCheckers().values()) {
+ checker.setContext(context);
+ }
+ for(TriggerAction action : actions) {
+ action.setContext(context);
+ }
+ for(TriggerAction action : expireActions) {
+ action.setContext(context);
+ }
+
+ trigger = new Trigger(triggerId, lastModifyTime, submitTime, submitUser, source, triggerCond, expireCond, actions, expireActions, info, context);
trigger.setResetOnExpire(resetOnExpire);
trigger.setResetOnTrigger(resetOnTrigger);
trigger.setStatus(status);
diff --git a/src/java/azkaban/trigger/TriggerAction.java b/src/java/azkaban/trigger/TriggerAction.java
index 85b6ad8..b186b7b 100644
--- a/src/java/azkaban/trigger/TriggerAction.java
+++ b/src/java/azkaban/trigger/TriggerAction.java
@@ -1,7 +1,11 @@
package azkaban.trigger;
+import java.util.Map;
+
public interface TriggerAction {
+ String getId();
+
String getType();
TriggerAction fromJson(Object obj) throws Exception;
@@ -9,6 +13,8 @@ public interface TriggerAction {
Object toJson();
void doAction() throws Exception;
+
+ void setContext(Map<String, Object> context);
String getDescription();
diff --git a/src/java/azkaban/trigger/TriggerLoader.java b/src/java/azkaban/trigger/TriggerLoader.java
index c3e604b..7adf742 100644
--- a/src/java/azkaban/trigger/TriggerLoader.java
+++ b/src/java/azkaban/trigger/TriggerLoader.java
@@ -1,9 +1,6 @@
package azkaban.trigger;
import java.util.List;
-import java.util.Map;
-
-
public interface TriggerLoader {
src/java/azkaban/trigger/TriggerManager.java 53(+44 -9)
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index 460a75f..0b21166 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -37,7 +37,6 @@ import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
import azkaban.triggerapp.TriggerConnectorParams;
import azkaban.utils.JSONUtils;
@@ -51,7 +50,7 @@ import azkaban.utils.Props;
public class TriggerManager {
private static Logger logger = Logger.getLogger(TriggerManager.class);
- private static final String TRIGGER_SUFFIX = ".trigger";
+ public static final String TRIGGER_SUFFIX = ".trigger";
private TriggerLoader triggerLoader;
private CheckerTypeLoader checkerTypeLoader;
@@ -148,7 +147,11 @@ public class TriggerManager {
private void loadTriggers() throws TriggerManagerException {
List<Trigger> triggerList = triggerLoader.loadTriggers();
for(Trigger t : triggerList) {
- triggerIdMap.put(t.getTriggerId(), t);
+ if(t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
+ removeTrigger(t, "azkaban");
+ } else {
+ triggerIdMap.put(t.getTriggerId(), t);
+ }
}
}
@@ -158,14 +161,16 @@ public class TriggerManager {
public void removeTrigger(Trigger t, String userId) throws TriggerManagerException {
synchronized(t) {
+ logger.info("Removing trigger " + t.getTriggerId() + " by " + userId);
callTriggerServer(t, TriggerConnectorParams.REMOVE_TRIGGER_ACTION, userId);
+ triggerIdMap.remove(t.getTriggerId());
}
}
-
public void updateTrigger(Trigger t, String userId) throws TriggerManagerException {
synchronized(t) {
try {
+ triggerLoader.updateTrigger(t);
callTriggerServer(t, TriggerConnectorParams.UPDATE_TRIGGER_ACTION, userId);
} catch(TriggerManagerException e) {
throw new TriggerManagerException(e);
@@ -186,6 +191,7 @@ public class TriggerManager {
String message = null;
logger.info("Inserting trigger into system. " );
// The trigger id is set by the loader. So it's unavailable until after this call.
+ t.setStatus(TriggerStatus.PREPARING);
triggerLoader.addTrigger(t);
try {
callTriggerServer(t, TriggerConnectorParams.INSERT_TRIGGER_ACTION, userId);
@@ -202,8 +208,7 @@ public class TriggerManager {
private Map<String, Object> callTriggerServer(Trigger t, String action, String user) throws TriggerManagerException {
try {
- Map<String, Object> info = t.getInfo();
- return callTriggerServer(triggerServerHost, triggerServerPort, action, t.getTriggerId(), null, (Pair<String,String>[])null);
+ return callTriggerServer(triggerServerHost, triggerServerPort, action, t.getTriggerId(), user, (Pair<String,String>[])null);
} catch (IOException e) {
throw new TriggerManagerException(e);
}
@@ -338,15 +343,22 @@ public class TriggerManager {
try{
results = callTriggerServer(triggerServer.getFirst(), triggerServer.getSecond(), TriggerConnectorParams.GET_UPDATE_ACTION, null, "azkaban", updateTimeParam);
// lastUpdateTime = (Long) results.get(TriggerConnectorParams.RESPONSE_UPDATETIME);
+
List<Integer> updates = (List<Integer>) results.get("updates");
for(Integer update : updates) {
Trigger t = triggerLoader.loadTrigger(update);
lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime().getMillis());
- triggerIdMap.put(update, t);
+
+ if(t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
+ removeTrigger(t, "azkaban");
+ //triggerIdMap.remove(update);
+ } else {
+ triggerIdMap.put(update, t);
+ }
}
} catch (Exception e) {
- logger.error(e);
-
+ e.printStackTrace();
+ logger.error(e);
}
synchronized(this) {
@@ -493,6 +505,29 @@ public class TriggerManager {
removeTrigger(triggerIdMap.get(scheduleId), submitUser);
}
+ public Set<String> getAllActiveTriggerServerHosts() {
+ Set<String> hostport = new HashSet<String>();
+ hostport.add(triggerServerHost+":"+triggerServerPort);
+ return hostport;
+ }
+
+ public int getNumTriggers() {
+ return triggerIdMap.size();
+ }
+
+ public String getTriggerSources() {
+ Set<String> sources = new HashSet<String>();
+ for(Trigger t : triggerIdMap.values()) {
+ sources.add(t.getSource());
+ }
+ return sources.toString();
+ }
+
+ public String getTriggerIds() {
+ return triggerIdMap.keySet().toString();
+ }
+
+
}
diff --git a/src/java/azkaban/triggerapp/AzkabanTriggerServer.java b/src/java/azkaban/triggerapp/AzkabanTriggerServer.java
index c84198f..5ed1c41 100644
--- a/src/java/azkaban/triggerapp/AzkabanTriggerServer.java
+++ b/src/java/azkaban/triggerapp/AzkabanTriggerServer.java
@@ -4,11 +4,15 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.management.ManagementFactory;
+import java.lang.reflect.Constructor;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.TimeZone;
import javax.management.MBeanInfo;
@@ -18,14 +22,15 @@ import javax.management.ObjectName;
import org.apache.log4j.Logger;
import org.joda.time.DateTimeZone;
import org.mortbay.jetty.Server;
-import org.mortbay.jetty.bio.SocketConnector;
-import org.mortbay.jetty.security.SslSocketConnector;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.thread.QueuedThreadPool;
+import azkaban.executor.ExecutorMailer;
import azkaban.executor.ExecutorManager;
import azkaban.executor.JdbcExecutorLoader;
+import azkaban.executor.ExecutorManager.Alerter;
+import azkaban.jmx.JmxExecutorManager;
import azkaban.jmx.JmxJettyServer;
import azkaban.jmx.JmxTriggerRunnerManager;
import azkaban.project.JdbcProjectLoader;
@@ -36,8 +41,11 @@ import azkaban.trigger.JdbcTriggerLoader;
import azkaban.trigger.TriggerLoader;
import azkaban.trigger.builtin.BasicTimeChecker;
import azkaban.trigger.builtin.CreateTriggerAction;
-import azkaban.trigger.builtin.ExecutableFlowStatusChecker;
+import azkaban.trigger.builtin.ExecutionChecker;
import azkaban.trigger.builtin.ExecuteFlowAction;
+import azkaban.trigger.builtin.KillExecutionAction;
+import azkaban.trigger.builtin.SlaAlertAction;
+import azkaban.trigger.builtin.SlaChecker;
import azkaban.utils.FileIOUtils;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
@@ -142,26 +150,42 @@ public class AzkabanTriggerServer {
logger.info("Loading project manager");
JdbcProjectLoader loader = new JdbcProjectLoader(props);
ProjectManager manager = new ProjectManager(loader, props);
-
return manager;
}
private void loadBuiltinCheckersAndActions(AzkabanTriggerServer app) {
logger.info("Loading built-in checker and action types");
- ExecutorManager executorManager = app.getExecutorManager();
- TriggerRunnerManager triggerRunnerManager = app.getTriggerRunnerManager();
+// ExecutorManager executorManager = app.getExecutorManager();
+// TriggerRunnerManager triggerRunnerManager = app.getTriggerRunnerManager();
CheckerTypeLoader checkerLoader = triggerRunnerManager.getCheckerLoader();
ActionTypeLoader actionLoader = triggerRunnerManager.getActionLoader();
// time:
checkerLoader.registerCheckerType(BasicTimeChecker.type, BasicTimeChecker.class);
- ExecutableFlowStatusChecker.setExecutorManager(executorManager);
- checkerLoader.registerCheckerType(ExecutableFlowStatusChecker.type, ExecutableFlowStatusChecker.class);
+// // execution checker
+// ExecutionChecker.setExecutorManager(executorManager);
+// checkerLoader.registerCheckerType(ExecutionChecker.type, ExecutionChecker.class);
+ // Sla checker
+ SlaChecker.setExecutorManager(executorManager);
+ checkerLoader.registerCheckerType(SlaChecker.type, SlaChecker.class);
+ // execut flow action
ExecuteFlowAction.setExecutorManager(executorManager);
ExecuteFlowAction.setProjectManager(projectManager);
+ ExecuteFlowAction.setTriggerRunnerManager(triggerRunnerManager);
actionLoader.registerActionType(ExecuteFlowAction.type, ExecuteFlowAction.class);
+ // kill flow action
+ KillExecutionAction.setExecutorManager(executorManager);
+ actionLoader.registerActionType(KillExecutionAction.type, KillExecutionAction.class);
+ // sla alert
+ SlaAlertAction.setExecutorManager(executorManager);
+ Map<String, Alerter> alerters = loadAlerters(props);
+ SlaAlertAction.setAlerters(alerters);
+ SlaAlertAction.setExecutorManager(executorManager);
+ actionLoader.registerActionType(SlaAlertAction.type, SlaAlertAction.class);
+ // create trigger action
CreateTriggerAction.setTriggerRunnerManager(triggerRunnerManager);
actionLoader.registerActionType(CreateTriggerAction.type, CreateTriggerAction.class);
+
}
private void loadPluginCheckersAndActions(String pluginPath, AzkabanTriggerServer app) {
@@ -291,6 +315,143 @@ public class AzkabanTriggerServer {
}
}
+ private Map<String, Alerter> loadAlerters(Props props) {
+ Map<String, Alerter> allAlerters = new HashMap<String, Alerter>();
+ // load built-in alerters
+ ExecutorMailer mailAlerter = new ExecutorMailer(props);
+ allAlerters.put("email", mailAlerter);
+ // load all plugin alerters
+ String pluginDir = props.getString("alerter.plugin.dir", "plugins/alerter");
+ allAlerters.putAll(loadPluginAlerters(pluginDir));
+ return allAlerters;
+ }
+
+ private Map<String, Alerter> loadPluginAlerters(String pluginPath) {
+ File alerterPluginPath = new File(pluginPath);
+ if (!alerterPluginPath.exists()) {
+ return Collections.<String, Alerter>emptyMap();
+ }
+
+ Map<String, Alerter> installedAlerterPlugins = new HashMap<String, Alerter>();
+ ClassLoader parentLoader = SlaAlertAction.class.getClass().getClassLoader();
+ File[] pluginDirs = alerterPluginPath.listFiles();
+ ArrayList<String> jarPaths = new ArrayList<String>();
+ for (File pluginDir: pluginDirs) {
+ if (!pluginDir.isDirectory()) {
+ logger.error("The plugin path " + pluginDir + " is not a directory.");
+ continue;
+ }
+
+ // Load the conf directory
+ File propertiesDir = new File(pluginDir, "conf");
+ Props pluginProps = null;
+ if (propertiesDir.exists() && propertiesDir.isDirectory()) {
+ File propertiesFile = new File(propertiesDir, "plugin.properties");
+ File propertiesOverrideFile = new File(propertiesDir, "override.properties");
+
+ if (propertiesFile.exists()) {
+ if (propertiesOverrideFile.exists()) {
+ pluginProps = PropsUtils.loadProps(null, propertiesFile, propertiesOverrideFile);
+ }
+ else {
+ pluginProps = PropsUtils.loadProps(null, propertiesFile);
+ }
+ }
+ else {
+ logger.error("Plugin conf file " + propertiesFile + " not found.");
+ continue;
+ }
+ }
+ else {
+ logger.error("Plugin conf path " + propertiesDir + " not found.");
+ continue;
+ }
+
+ String pluginName = pluginProps.getString("alerter.name");
+ List<String> extLibClasspath = pluginProps.getStringList("alerter.external.classpaths", (List<String>)null);
+
+ String pluginClass = pluginProps.getString("alerter.class");
+ if (pluginClass == null) {
+ logger.error("Alerter class is not set.");
+ }
+ else {
+ logger.info("Plugin class " + pluginClass);
+ }
+
+ URLClassLoader urlClassLoader = null;
+ File libDir = new File(pluginDir, "lib");
+ if (libDir.exists() && libDir.isDirectory()) {
+ File[] files = libDir.listFiles();
+
+ ArrayList<URL> urls = new ArrayList<URL>();
+ for (int i=0; i < files.length; ++i) {
+ try {
+ URL url = files[i].toURI().toURL();
+ urls.add(url);
+ } catch (MalformedURLException e) {
+ logger.error(e);
+ }
+ }
+ if (extLibClasspath != null) {
+ for (String extLib : extLibClasspath) {
+ try {
+ File file = new File(pluginDir, extLib);
+ URL url = file.toURI().toURL();
+ urls.add(url);
+ } catch (MalformedURLException e) {
+ logger.error(e);
+ }
+ }
+ }
+
+ urlClassLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), parentLoader);
+ }
+ else {
+ logger.error("Library path " + propertiesDir + " not found.");
+ continue;
+ }
+
+ Class<?> alerterClass = null;
+ try {
+ alerterClass = urlClassLoader.loadClass(pluginClass);
+ }
+ catch (ClassNotFoundException e) {
+ logger.error("Class " + pluginClass + " not found.");
+ continue;
+ }
+
+ String source = FileIOUtils.getSourcePathFromClass(alerterClass);
+ logger.info("Source jar " + source);
+ jarPaths.add("jar:file:" + source);
+
+ Constructor<?> constructor = null;
+ try {
+ constructor = alerterClass.getConstructor(Props.class);
+ } catch (NoSuchMethodException e) {
+ logger.error("Constructor not found in " + pluginClass);
+ continue;
+ }
+
+ Object obj = null;
+ try {
+ obj = constructor.newInstance(pluginProps);
+ } catch (Exception e) {
+ logger.error(e);
+ }
+
+ if (!(obj instanceof Alerter)) {
+ logger.error("The object is not an Alerter");
+ continue;
+ }
+
+ Alerter plugin = (Alerter) obj;
+ installedAlerterPlugins.put(pluginName, plugin);
+ }
+
+ return installedAlerterPlugins;
+
+ }
+
private TriggerLoader createTriggerLoader(Props props) {
return new JdbcTriggerLoader(props);
}
@@ -417,6 +578,7 @@ public class AzkabanTriggerServer {
registerMbean("triggerServerJetty", new JmxJettyServer(server));
registerMbean("triggerRunnerManager", new JmxTriggerRunnerManager(triggerRunnerManager));
+ registerMbean("executorManager", new JmxExecutorManager(executorManager));
}
public void close() {
diff --git a/src/java/azkaban/triggerapp/TriggerRunnerManager.java b/src/java/azkaban/triggerapp/TriggerRunnerManager.java
index b4e5fe7..9a7d62d 100644
--- a/src/java/azkaban/triggerapp/TriggerRunnerManager.java
+++ b/src/java/azkaban/triggerapp/TriggerRunnerManager.java
@@ -4,11 +4,12 @@ import java.io.IOException;
import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
@@ -39,6 +40,7 @@ public class TriggerRunnerManager {
private final TriggerScannerThread runnerThread;
private long lastRunnerThreadCheckTime = -1;
+ private long runnerThreadIdleTime = -1;
public TriggerRunnerManager(Props props, TriggerLoader triggerLoader) throws IOException {
@@ -117,28 +119,23 @@ public class TriggerRunnerManager {
}
public synchronized void updateTrigger(int triggerId) throws TriggerManagerException {
- Trigger t = triggerIdMap.get(triggerId);
- if(t == null) {
- throw new TriggerManagerException("The trigger to update doesn't exist!");
- }
- runnerThread.deleteTrigger(t);
- runnerThread.addTrigger(t);
- triggerIdMap.put(t.getTriggerId(), t);
- triggerLoader.updateTrigger(t);
+ Trigger t = triggerIdMap.get(triggerId);
+
+ updateTrigger(t);
}
public synchronized void updateTrigger(Trigger t) throws TriggerManagerException {
if(!triggerIdMap.containsKey(t.getTriggerId())) {
throw new TriggerManagerException("The trigger to update doesn't exist!");
}
-
runnerThread.deleteTrigger(t);
- runnerThread.addTrigger(t);
- triggerIdMap.put(t.getTriggerId(), t);
-
- triggerLoader.updateTrigger(t);
+
+ Trigger t2 = triggerLoader.loadTrigger(t.getTriggerId());
+ runnerThread.addTrigger(t2);
+ triggerIdMap.put(t2.getTriggerId(), t2);
+
}
public synchronized void removeTrigger(Trigger t) throws TriggerManagerException {
@@ -205,11 +202,11 @@ public class TriggerRunnerManager {
logger.error(t.getMessage());
}
- long timeRemaining = scannerInterval - (System.currentTimeMillis() - getLastRunnerThreadCheckTime());
- if(timeRemaining < 0) {
+ runnerThreadIdleTime = scannerInterval - (System.currentTimeMillis() - getLastRunnerThreadCheckTime());
+ if(runnerThreadIdleTime < 0) {
logger.error("Trigger manager thread " + this.getName() + " is too busy!");
} else {
- wait(timeRemaining);
+ wait(runnerThreadIdleTime);
}
} catch(InterruptedException e) {
logger.info("Interrupted. Probably to shut down.");
@@ -221,6 +218,7 @@ public class TriggerRunnerManager {
private void checkAllTriggers() throws TriggerManagerException {
for(Trigger t : triggers) {
+ logger.info("Checking trigger " + t.getDescription());
if(t.getStatus().equals(TriggerStatus.READY)) {
if(t.triggerConditionMet()) {
onTriggerTrigger(t);
@@ -235,22 +233,23 @@ public class TriggerRunnerManager {
List<TriggerAction> actions = t.getTriggerActions();
for(TriggerAction action : actions) {
try {
+ logger.info("Doing trigger actions");
action.doAction();
} catch (Exception e) {
// TODO Auto-generated catch block
- throw new TriggerManagerException("action failed to execute", e);
+ //throw new TriggerManagerException("action failed to execute", e);
+ logger.error("Failed to do action " + action.getDescription(), e);
+ } catch (Throwable th) {
+ logger.error("Failed to do action " + action.getDescription(), th);
}
}
if(t.isResetOnTrigger()) {
t.resetTriggerConditions();
t.resetExpireCondition();
-// updateTrigger(t);
} else {
t.setStatus(TriggerStatus.EXPIRED);
}
-
triggerLoader.updateTrigger(t);
-
// updateAgent(t);
}
@@ -258,10 +257,14 @@ public class TriggerRunnerManager {
List<TriggerAction> expireActions = t.getExpireActions();
for(TriggerAction action : expireActions) {
try {
+ logger.info("Doing expire actions");
action.doAction();
} catch (Exception e) {
// TODO Auto-generated catch block
- throw new TriggerManagerException("expire action failed to execute", e);
+ //throw new TriggerManagerException("action failed to execute", e);
+ logger.error("Failed to do expire action " + action.getDescription(), e);
+ } catch (Throwable th) {
+ logger.error("Failed to do expire action " + action.getDescription(), th);
}
}
if(t.isResetOnExpire()) {
@@ -271,7 +274,6 @@ public class TriggerRunnerManager {
} else {
t.setStatus(TriggerStatus.EXPIRED);
}
-// updateAgent(t);
triggerLoader.updateTrigger(t);
}
}
@@ -338,4 +340,24 @@ public class TriggerRunnerManager {
}
}
+ public int getNumTriggers() {
+ return triggerIdMap.size();
+ }
+
+ public String getTriggerSources() {
+ Set<String> sources = new HashSet<String>();
+ for(Trigger t : triggerIdMap.values()) {
+ sources.add(t.getSource());
+ }
+ return sources.toString();
+ }
+
+ public String getTriggerIds() {
+ return triggerIdMap.keySet().toString();
+ }
+
+ public long getScannerIdleTime() {
+ return runnerThreadIdleTime;
+ }
+
}
diff --git a/src/java/azkaban/utils/AbstractMailer.java b/src/java/azkaban/utils/AbstractMailer.java
index 85f589c..721675a 100644
--- a/src/java/azkaban/utils/AbstractMailer.java
+++ b/src/java/azkaban/utils/AbstractMailer.java
@@ -48,6 +48,10 @@ public class AbstractMailer {
return message;
}
+ public EmailMessage prepareEmailMessage(String subject, String mimetype, Collection<String> emailList) {
+ return createEmailMessage(subject, mimetype, emailList);
+ }
+
public String getAzkabanName() {
return azkabanName;
}
src/java/azkaban/webapp/AzkabanWebServer.java 69(+56 -13)
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index fdfbed3..39c105c 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -55,9 +55,11 @@ import org.mortbay.thread.QueuedThreadPool;
import azkaban.database.AzkabanDatabaseSetup;
import azkaban.executor.ExecutorManager;
import azkaban.executor.JdbcExecutorLoader;
+import azkaban.executor.ExecutorManager.Alerter;
import azkaban.jmx.JmxExecutorManager;
import azkaban.jmx.JmxJettyServer;
import azkaban.jmx.JmxScheduler;
+import azkaban.jmx.JmxTriggerManager;
import azkaban.project.JdbcProjectLoader;
import azkaban.project.ProjectManager;
@@ -65,11 +67,20 @@ import azkaban.scheduler.JdbcScheduleLoader;
import azkaban.scheduler.ScheduleLoader;
import azkaban.scheduler.ScheduleManager;
import azkaban.scheduler.TriggerBasedScheduleLoader;
+import azkaban.trigger.ActionTypeLoader;
+import azkaban.trigger.CheckerTypeLoader;
import azkaban.trigger.JdbcTriggerLoader;
import azkaban.trigger.TriggerLoader;
import azkaban.trigger.TriggerManager;
import azkaban.trigger.TriggerAgent;
import azkaban.trigger.TriggerManagerException;
+import azkaban.trigger.builtin.BasicTimeChecker;
+import azkaban.trigger.builtin.CreateTriggerAction;
+import azkaban.trigger.builtin.ExecuteFlowAction;
+import azkaban.trigger.builtin.KillExecutionAction;
+import azkaban.trigger.builtin.SlaAlertAction;
+import azkaban.trigger.builtin.SlaChecker;
+import azkaban.triggerapp.AzkabanTriggerServer;
import azkaban.user.UserManager;
import azkaban.user.XmlUserManager;
import azkaban.utils.FileIOUtils;
@@ -167,17 +178,15 @@ public class AzkabanWebServer extends AzkabanServer {
this.server = server;
velocityEngine = configureVelocityEngine(props.getBoolean(VELOCITY_DEV_MODE_PARAM, false));
sessionCache = new SessionCache(props);
- userManager = loadUserManager(props);
- executorManager = loadExecutorManager(props);
+ userManager = loadUserManager(props);
triggerManager = loadTriggerManager(props);
-
- projectManager = loadProjectManager(props);
-
-// scheduler = loadScheduler(executorManager, projectManager, triggerManager);
-
+ executorManager = loadExecutorManager(props);
+ projectManager = loadProjectManager(props, triggerManager);
scheduleManager = loadScheduleManager(executorManager, triggerManager, props);
+ loadBuiltinCheckersAndActions();
+
baseClassLoader = getBaseClassloader();
tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
@@ -225,12 +234,12 @@ public class AzkabanWebServer extends AzkabanServer {
return manager;
}
- private ProjectManager loadProjectManager(Props props) {
+ private ProjectManager loadProjectManager(Props props, TriggerManager tm) {
logger.info("Loading JDBC for project management");
JdbcProjectLoader loader = new JdbcProjectLoader(props);
ProjectManager manager = new ProjectManager(loader, props);
- manager.setTriggerManager(triggerManager);
+ manager.setTriggerManager(tm);
return manager;
}
@@ -241,7 +250,7 @@ public class AzkabanWebServer extends AzkabanServer {
return execManager;
}
- private ScheduleManager loadScheduleManager(ExecutorManager executorManager, TriggerManager triggerManager, Props props ) throws Exception {
+ private ScheduleManager loadScheduleManager(ExecutorManager executorManager, TriggerManager tm, Props props ) throws Exception {
ScheduleManager schedManager = null;
String scheduleLoaderType = props.getString("azkaban.scheduler.loader", "TriggerBasedScheduleLoader");
if(scheduleLoaderType.equals("JdbcScheduleLoader")) {
@@ -251,7 +260,7 @@ public class AzkabanWebServer extends AzkabanServer {
schedManager.start();
} else if(scheduleLoaderType.equals("TriggerBasedScheduleLoader")) {
logger.info("Loading trigger based scheduler");
- ScheduleLoader loader = new TriggerBasedScheduleLoader(triggerManager, executorManager, null, ScheduleManager.triggerSource);
+ ScheduleLoader loader = new TriggerBasedScheduleLoader(tm, executorManager, null, ScheduleManager.triggerSource);
schedManager = new ScheduleManager(executorManager, loader, true);
}
@@ -268,6 +277,39 @@ public class AzkabanWebServer extends AzkabanServer {
return new TriggerManager(props, loader);
}
+ private void loadBuiltinCheckersAndActions() {
+ logger.info("Loading built-in checker and action types");
+// ExecutorManager executorManager = app.getExecutorManager();
+// TriggerRunnerManager triggerRunnerManager = app.getTriggerRunnerManager();
+ CheckerTypeLoader checkerLoader = triggerManager.getCheckerLoader();
+ ActionTypeLoader actionLoader = triggerManager.getActionLoader();
+ // time:
+ checkerLoader.registerCheckerType(BasicTimeChecker.type, BasicTimeChecker.class);
+// // execution checker
+// ExecutionChecker.setExecutorManager(executorManager);
+// checkerLoader.registerCheckerType(ExecutionChecker.type, ExecutionChecker.class);
+ // Sla checker
+// SlaChecker.setExecutorManager(executorManager);
+ checkerLoader.registerCheckerType(SlaChecker.type, SlaChecker.class);
+
+ // execut flow action
+// ExecuteFlowAction.setExecutorManager(executorManager);
+// ExecuteFlowAction.setProjectManager(projectManager);
+ actionLoader.registerActionType(ExecuteFlowAction.type, ExecuteFlowAction.class);
+ // kill flow action
+// KillExecutionAction.setExecutorManager(executorManager);
+ actionLoader.registerActionType(KillExecutionAction.type, KillExecutionAction.class);
+ // sla alert
+// SlaAlertAction.setExecutorManager(executorManager);
+// Map<String, Alerter> alerters = loadAlerters(props);
+// SlaAlertAction.setAlerters(alerters);
+ actionLoader.registerActionType(SlaAlertAction.type, SlaAlertAction.class);
+ // create trigger action
+// CreateTriggerAction.setTriggerRunnerManager(triggerRunnerManager);
+ actionLoader.registerActionType(CreateTriggerAction.type, CreateTriggerAction.class);
+
+ }
+
/**
* Returns the web session cache.
*
@@ -527,7 +569,8 @@ public class AzkabanWebServer extends AzkabanServer {
private static Map<String, TriggerPlugin> loadTriggerPlugins(Context root, String pluginPath, AzkabanWebServer azkabanWebApp) {
File triggerPluginPath = new File(pluginPath);
if (!triggerPluginPath.exists()) {
- return Collections.<String, TriggerPlugin>emptyMap();
+ //return Collections.<String, TriggerPlugin>emptyMap();
+ return new HashMap<String, TriggerPlugin>();
}
Map<String, TriggerPlugin> installedTriggerPlugins = new HashMap<String, TriggerPlugin>();
@@ -895,7 +938,7 @@ public class AzkabanWebServer extends AzkabanServer {
mbeanServer = ManagementFactory.getPlatformMBeanServer();
registerMbean("jetty", new JmxJettyServer(server));
- registerMbean("scheduler", new JmxScheduler(scheduleManager));
+ registerMbean("triggerManager", new JmxTriggerManager(triggerManager));
registerMbean("executorManager", new JmxExecutorManager(executorManager));
}
diff --git a/src/java/azkaban/webapp/servlet/JMXHttpServlet.java b/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
index 75982d1..2d4dff5 100644
--- a/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
+++ b/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
@@ -17,6 +17,8 @@ import org.apache.log4j.Logger;
import azkaban.executor.ConnectorParams;
import azkaban.executor.ExecutorManager;
+import azkaban.triggerapp.TriggerConnectorParams;
+import azkaban.trigger.TriggerManager;
import azkaban.user.Permission;
import azkaban.user.Role;
import azkaban.user.User;
@@ -38,6 +40,7 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements Conne
private UserManager userManager;
private AzkabanWebServer server;
private ExecutorManager executorManager;
+ private TriggerManager triggerManager;
@Override
public void init(ServletConfig config) throws ServletException {
@@ -46,6 +49,7 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements Conne
server = (AzkabanWebServer)getApplication();
userManager = server.getUserManager();
executorManager = server.getExecutorManager();
+ triggerManager = server.getTriggerManager();
}
@Override
@@ -71,6 +75,17 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements Conne
Map<String, Object> result = executorManager.callExecutorJMX(hostPort, JMX_GET_ALL_MBEAN_ATTRIBUTES, mbean);
ret = result;
}
+ else if (TriggerConnectorParams.JMX_GET_ALL_TRIGGER_SERVER_ATTRIBUTES.equals(ajax)) {
+ if(!hasParam(req, JMX_MBEAN) || !hasParam(req, JMX_HOSTPORT)) {
+ ret.put("error", "Parameters '" + JMX_MBEAN + "' and '"+ JMX_HOSTPORT +"' must be set");
+ this.writeJSON(resp, ret, true);
+ return;
+ }
+ String hostPort = getParam(req, JMX_HOSTPORT);
+ String mbean = getParam(req, JMX_MBEAN);
+ Map<String, Object> result = triggerManager.callTriggerServerJMX(hostPort, JMX_GET_ALL_MBEAN_ATTRIBUTES, mbean);
+ ret = result;
+ }
else if (JMX_GET_MBEANS.equals(ajax)) {
ret.put("mbeans", server.getMbeanNames());
}
@@ -177,7 +192,28 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements Conne
}
}
- page.add("remoteMBeans", executorMBeans);
+ page.add("executorRemoteMBeans", executorMBeans);
+
+ Map<String, Object> triggerserverMBeans = new HashMap<String,Object>();
+ Set<String> primaryTriggerServerHosts = triggerManager.getPrimaryServerHosts();
+ for (String hostPort: triggerManager.getAllActiveTriggerServerHosts()) {
+ try {
+ Map<String, Object> mbeans = triggerManager.callTriggerServerJMX(hostPort, TriggerConnectorParams.JMX_GET_MBEANS, null);
+
+ if (primaryTriggerServerHosts.contains(hostPort)) {
+ triggerserverMBeans.put(hostPort, mbeans.get("mbeans"));
+ }
+ else {
+ triggerserverMBeans.put(hostPort, mbeans.get("mbeans"));
+ }
+ }
+ catch (IOException e) {
+ logger.error("Cannot contact executor " + hostPort, e);
+ }
+ }
+
+ page.add("triggerserverRemoteMBeans", triggerserverMBeans);
+
page.render();
}
src/java/azkaban/webapp/servlet/ScheduleServlet.java 352(+183 -169)
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 73fefdb..1ef2cef 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -54,6 +54,7 @@ import azkaban.scheduler.Schedule;
import azkaban.scheduler.ScheduleManager;
import azkaban.scheduler.ScheduleManagerException;
import azkaban.scheduler.ScheduleStatisticManager;
+import azkaban.sla.SlaOption;
import azkaban.user.Permission;
import azkaban.user.Permission.Type;
import azkaban.user.User;
@@ -95,12 +96,12 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
HashMap<String, Object> ret = new HashMap<String, Object>();
String ajaxName = getParam(req, "ajax");
-// if (ajaxName.equals("slaInfo")) {
-// ajaxSlaInfo(req, ret, session.getUser());
-// }
-// else if(ajaxName.equals("setSla")) {
-// ajaxSetSla(req, ret, session.getUser());
-// }
+ if (ajaxName.equals("slaInfo")) {
+ ajaxSlaInfo(req, ret, session.getUser());
+ }
+ else if(ajaxName.equals("setSla")) {
+ ajaxSetSla(req, ret, session.getUser());
+ } else
if(ajaxName.equals("loadFlow")) {
ajaxLoadFlows(req, ret, session.getUser());
}
@@ -117,98 +118,110 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
}
-// private void ajaxSetSla(HttpServletRequest req, HashMap<String, Object> ret, User user) {
-// try {
-//
-// int scheduleId = getIntParam(req, "scheduleId");
-//
-// Schedule sched = scheduleManager.getSchedule(scheduleId);
-//
-// Project project = projectManager.getProject(sched.getProjectId());
-// if(!hasPermission(project, user, Permission.Type.SCHEDULE)) {
-// ret.put("error", "User " + user + " does not have permission to set SLA for this flow.");
-// return;
-// }
-//
-//
-// SlaOptions slaOptions= new SlaOptions();
-//
-// String slaEmails = getParam(req, "slaEmails");
-// String[] emailSplit = slaEmails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
-//
-// Map<String, String> settings = getParamGroup(req, "settings");
-// List<SlaSetting> slaSettings = new ArrayList<SlaSetting>();
-// for(String set : settings.keySet()) {
-// SlaSetting s;
-// try {
-// s = parseSlaSetting(settings.get(set));
-// }
-// catch (Exception e) {
-// throw new ServletException(e);
-// }
-// if(s != null) {
-// slaSettings.add(s);
-// }
-// }
-//
-// if(slaSettings.size() > 0) {
-// if(slaEmails.equals("")) {
-// ret.put("error", "Please put correct email settings for your SLA actions");
-// return;
-// }
-// slaOptions.setSlaEmails(Arrays.asList(emailSplit));
-// slaOptions.setSettings(slaSettings);
-// }
-// else {
-// slaOptions = null;
-// }
-// sched.setSlaOptions(slaOptions);
-// scheduleManager.insertSchedule(sched);
-//
-// if(slaOptions != null) {
-// projectManager.postProjectEvent(project, EventType.SLA, user.getUserId(), "SLA for flow " + sched.getFlowName() + " has been added/changed.");
-// }
-//
-// } catch (ServletException e) {
-// ret.put("error", e.getMessage());
-// } catch (ScheduleManagerException e) {
-// ret.put("error", e.getMessage());
-// }
-//
-// }
+ private void ajaxSetSla(HttpServletRequest req, HashMap<String, Object> ret, User user) {
+ try {
+
+ int scheduleId = getIntParam(req, "scheduleId");
+
+ Schedule sched = scheduleManager.getSchedule(scheduleId);
+
+ Project project = projectManager.getProject(sched.getProjectId());
+ if(!hasPermission(project, user, Permission.Type.SCHEDULE)) {
+ ret.put("error", "User " + user + " does not have permission to set SLA for this flow.");
+ return;
+ }
+
+ String emailStr = getParam(req, "slaEmails");
+ String[] emailSplit = emailStr.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+ List<String> slaEmails = Arrays.asList(emailSplit);
+
+ Map<String, String> settings = getParamGroup(req, "settings");
+
+ List<SlaOption> slaOptions = new ArrayList<SlaOption>();
+ for(String set : settings.keySet()) {
+ SlaOption sla;
+ try {
+ sla = parseSlaSetting(settings.get(set));
+ sla.getInfo().put(SlaOption.INFO_FLOW_NAME, sched.getFlowName());
+ sla.getInfo().put(SlaOption.INFO_EMAIL_LIST, slaEmails);
+ }
+ catch (Exception e) {
+ throw new ServletException(e);
+ }
+ if(sla != null) {
+ sla.getInfo().put("SlaEmails", slaEmails);
+ slaOptions.add(sla);
+ }
+ }
+
+ sched.setSlaOptions(slaOptions);
+ scheduleManager.insertSchedule(sched);
+
+ if(slaOptions != null) {
+ projectManager.postProjectEvent(project, EventType.SLA, user.getUserId(), "SLA for flow " + sched.getFlowName() + " has been added/changed.");
+ }
+
+ } catch (ServletException e) {
+ ret.put("error", e.getMessage());
+ } catch (ScheduleManagerException e) {
+ ret.put("error", e.getMessage());
+ }
+
+ }
-// private SlaSetting parseSlaSetting(String set) throws ScheduleManagerException {
-// // "" + Duration + EmailAction + KillAction
-// String[] parts = set.split(",", -1);
-// String id = parts[0];
-// String rule = parts[1];
-// String duration = parts[2];
-// String emailAction = parts[3];
-// String killAction = parts[4];
-// if(emailAction.equals("true") || killAction.equals("true")) {
-// SlaSetting r = new SlaSetting();
-// r.setId(id);
-// r.setRule(SlaRule.valueOf(rule));
-// ReadablePeriod dur;
-// try {
-// dur = parseDuration(duration);
-// }
-// catch (Exception e) {
-// throw new ScheduleManagerException("Unable to parse duration for a SLA that needs to take actions!", e);
-// }
-// r.setDuration(dur);
-// List<SlaAction> actions = new ArrayList<SLA.SlaAction>();
-// if(emailAction.equals("true")) {
-// actions.add(SlaAction.EMAIL);
-// }
-// if(killAction.equals("true")) {
-// actions.add(SlaAction.KILL);
-// }
-// r.setActions(actions);
-// return r;
-// }
-// return null;
-// }
+ private SlaOption parseSlaSetting(String set) throws ScheduleManagerException {
+ // "" + Duration + EmailAction + KillAction
+ logger.info("Tryint to set sla with the following set: " + set);
+
+ String slaType;
+ List<String> slaActions = new ArrayList<String>();
+ Map<String, Object> slaInfo = new HashMap<String, Object>();
+ String[] parts = set.split(",", -1);
+ String id = parts[0];
+ String rule = parts[1];
+ String duration = parts[2];
+ String emailAction = parts[3];
+ String killAction = parts[4];
+ if(emailAction.equals("true") || killAction.equals("true")) {
+ //String type = id.equals("") ? SlaOption.RULE_FLOW_RUNTIME_SLA : SlaOption.RULE_JOB_RUNTIME_SLA ;
+ if(emailAction.equals("true")) {
+ slaActions.add(SlaOption.ACTION_ALERT);
+ slaInfo.put(SlaOption.ALERT_TYPE, "email");
+ }
+ if(killAction.equals("true")) {
+ slaActions.add(SlaOption.ACTION_CANCEL_FLOW);
+ }
+ if(id.equals("")) {
+ if(rule.equals("SUCCESS")) {
+ slaType = SlaOption.TYPE_FLOW_SUCCEED;
+ }
+ else {
+ slaType = SlaOption.TYPE_FLOW_FINISH;
+ }
+ } else {
+ slaInfo.put(SlaOption.INFO_JOB_NAME, id);
+ if(rule.equals("SUCCESS")) {
+ slaType = SlaOption.TYPE_JOB_SUCCEED;
+ } else {
+ slaType = SlaOption.TYPE_JOB_FINISH;
+ }
+ }
+
+ ReadablePeriod dur;
+ try {
+ dur = parseDuration(duration);
+ }
+ catch (Exception e) {
+ throw new ScheduleManagerException("Unable to parse duration for a SLA that needs to take actions!", e);
+ }
+
+ slaInfo.put(SlaOption.INFO_DURATION, Utils.createPeriodString(dur));
+ SlaOption r = new SlaOption(slaType, slaActions, slaInfo);
+ logger.info("Parsing sla as id:" + id + " type:" + slaType + " rule:" + rule + " Duration:" + duration + " actions:" + slaActions);
+ return r;
+ }
+ return null;
+ }
private ReadablePeriod parseDuration(String duration) {
int hour = Integer.parseInt(duration.split(":")[0]);
@@ -216,77 +229,76 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return Minutes.minutes(min+hour*60).toPeriod();
}
-// private void ajaxSlaInfo(HttpServletRequest req, HashMap<String, Object> ret, User user) {
-// int scheduleId;
-// try {
-// scheduleId = getIntParam(req, "scheduleId");
-//
-// Schedule sched = scheduleManager.getSchedule(scheduleId);
-//
-// Project project = getProjectAjaxByPermission(ret, sched.getProjectId(), user, Type.READ);
-// if (project == null) {
-// ret.put("error", "Error loading project. Project " + sched.getProjectId() + " doesn't exist");
-// return;
-// }
-//
-// Flow flow = project.getFlow(sched.getFlowName());
-// if (flow == null) {
-// ret.put("error", "Error loading flow. Flow " + sched.getFlowName() + " doesn't exist in " + sched.getProjectId());
-// return;
-// }
-//
-// SlaOptions slaOptions = sched.getSlaOptions();
-// ExecutionOptions flowOptions = sched.getExecutionOptions();
-//
-// if(slaOptions != null) {
-// ret.put("slaEmails", slaOptions.getSlaEmails());
-// List<SlaSetting> settings = slaOptions.getSettings();
-// List<Object> setObj = new ArrayList<Object>();
-// for(SlaSetting set: settings) {
-// setObj.add(set.toObject());
-// }
-// ret.put("settings", setObj);
-// }
-// else if (flowOptions != null) {
-// if(flowOptions.getFailureEmails() != null) {
-// List<String> emails = flowOptions.getFailureEmails();
-// if(emails.size() > 0) {
-// ret.put("slaEmails", emails);
-// }
-// }
-// }
-// else {
-// if(flow.getFailureEmails() != null) {
-// List<String> emails = flow.getFailureEmails();
-// if(emails.size() > 0) {
-// ret.put("slaEmails", emails);
-// }
-// }
-// }
-//
-// List<String> disabledJobs;
-// if(flowOptions != null) {
-// disabledJobs = flowOptions.getDisabledJobs() == null ? new ArrayList<String>() : flowOptions.getDisabledJobs();
-// }
-// else {
-// disabledJobs = new ArrayList<String>();
-// }
-//
-// List<String> allJobs = new ArrayList<String>();
-// for(Node n : flow.getNodes()) {
-// if(!disabledJobs.contains(n.getId())) {
-// allJobs.add(n.getId());
-// }
-// }
-// ret.put("allJobNames", allJobs);
-// } catch (ServletException e) {
-// ret.put("error", e);
-// } catch (ScheduleManagerException e) {
-// // TODO Auto-generated catch block
-// ret.put("error", e);
-// }
-//
-// }
+ private void ajaxSlaInfo(HttpServletRequest req, HashMap<String, Object> ret, User user) {
+ int scheduleId;
+ try {
+ scheduleId = getIntParam(req, "scheduleId");
+
+ Schedule sched = scheduleManager.getSchedule(scheduleId);
+
+ Project project = getProjectAjaxByPermission(ret, sched.getProjectId(), user, Type.READ);
+ if (project == null) {
+ ret.put("error", "Error loading project. Project " + sched.getProjectId() + " doesn't exist");
+ return;
+ }
+
+ Flow flow = project.getFlow(sched.getFlowName());
+ if (flow == null) {
+ ret.put("error", "Error loading flow. Flow " + sched.getFlowName() + " doesn't exist in " + sched.getProjectId());
+ return;
+ }
+
+ List<SlaOption> slaOptions = sched.getSlaOptions();
+ ExecutionOptions flowOptions = sched.getExecutionOptions();
+
+ if(slaOptions != null && slaOptions.size() > 0) {
+ ret.put("slaEmails", slaOptions.get(0).getInfo().get("SlaEmails"));
+
+ List<Object> setObj = new ArrayList<Object>();
+ for(SlaOption sla: slaOptions) {
+ setObj.add(sla.toWebObject());
+ }
+ ret.put("settings", setObj);
+ }
+ else if (flowOptions != null) {
+ if(flowOptions.getFailureEmails() != null) {
+ List<String> emails = flowOptions.getFailureEmails();
+ if(emails.size() > 0) {
+ ret.put("slaEmails", emails);
+ }
+ }
+ }
+ else {
+ if(flow.getFailureEmails() != null) {
+ List<String> emails = flow.getFailureEmails();
+ if(emails.size() > 0) {
+ ret.put("slaEmails", emails);
+ }
+ }
+ }
+
+ List<String> disabledJobs;
+ if(flowOptions != null) {
+ disabledJobs = flowOptions.getDisabledJobs() == null ? new ArrayList<String>() : flowOptions.getDisabledJobs();
+ }
+ else {
+ disabledJobs = new ArrayList<String>();
+ }
+
+ List<String> allJobs = new ArrayList<String>();
+ for(Node n : flow.getNodes()) {
+ if(!disabledJobs.contains(n.getId())) {
+ allJobs.add(n.getId());
+ }
+ }
+ ret.put("allJobNames", allJobs);
+ } catch (ServletException e) {
+ ret.put("error", e);
+ } catch (ScheduleManagerException e) {
+ ret.put("error", e);
+ }
+
+ }
protected Project getProjectAjaxByPermission(Map<String, Object> ret, int projectId, User user, Permission.Type type) {
Project project = projectManager.getProject(projectId);
@@ -614,7 +626,9 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
ret.put("error", e.getMessage());
}
- Schedule schedule = scheduleManager.scheduleFlow(-1, projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(), thePeriod, DateTime.now().getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions);
+ List<SlaOption> slaOptions = null;
+
+ Schedule schedule = scheduleManager.scheduleFlow(-1, projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(), thePeriod, DateTime.now().getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);
logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName + " (" + projectId +")" + "].");
projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.toString() + " has been added.");
diff --git a/src/java/azkaban/webapp/servlet/velocity/jmxpage.vm b/src/java/azkaban/webapp/servlet/velocity/jmxpage.vm
index 2a0e945..efa5c60 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jmxpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jmxpage.vm
@@ -89,7 +89,7 @@
</tbody>
</table>
-#foreach($executor in $remoteMBeans.entrySet())
+#foreach($executor in $executorRemoteMBeans.entrySet())
<h3 class="subhead">Remote Executor JMX $executor.key</h3>
<table class="all-jobs job-table remoteJMX">
<thead>
@@ -131,6 +131,50 @@
</tbody>
</table>
#end
+
+#foreach($triggerserver in $triggerserverRemoteMBeans.entrySet())
+ <h3 class="subhead">Remote Trigger Server JMX $triggerserver.key</h3>
+ <table class="all-jobs job-table remoteJMX">
+ <thead>
+ <tr>
+ <th>Name</th>
+ <th>Domain</th>
+ <th>Canonical Name</th>
+ <th></th>
+ </tr>
+ </thead>
+ <tbody>
+ #foreach($bean in $triggerserver.value)
+ <tr>
+ <td>${bean.get("keyPropertyList").get("name")}</td>
+ <td>${bean.get("domain")}</td>
+ <td>${bean.get("canonicalName")}</td>
+ <td><div class="btn4 querybtn" id="expandBtn-$counter" domain="${bean.get("domain")}" name="${bean.get("keyPropertyList").get("name")}" hostport="$triggerserver.key">Query</div></td>
+ </tr>
+ <tr class="childrow" id="expandBtn-${counter}-child" style="display: none;">
+ <td class="expandedFlow" colspan="3">
+ <table class="innerTable">
+ <thead>
+ <tr>
+ <th>Attribute Name</th>
+ <th>Value</th>
+ </tr>
+ </thead>
+ <tbody id="expandBtn-${counter}-tbody">
+ </tbody>
+ </table>
+ </td>
+
+ <td>
+ <div class="btn4 collapse">Collapse</div>
+ </td>
+ </tr>
+ #set($counter=$counter + 1)
+ #end
+ </tbody>
+ </table>
+#end
+
</div>
</body>
</html>
diff --git a/src/java/azkaban/webapp/servlet/velocity/triggerspage.vm b/src/java/azkaban/webapp/servlet/velocity/triggerspage.vm
index bf62ca4..6ccb348 100644
--- a/src/java/azkaban/webapp/servlet/velocity/triggerspage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/triggerspage.vm
@@ -72,7 +72,8 @@
<th>Source</th>
<th>Submitted By</th>
<th>Description</th>
- <th colspan="2" class="action ignoresort">Action</th>
+ <th>Status</th>
+ <!--th colspan="2" class="action ignoresort">Action</th-->
</tr>
</thead>
<tbody>
@@ -84,7 +85,8 @@
<td>${trigger.source}</td>
<td>${trigger.submitUser}</td>
<td>${trigger.getDescription()}</td>
- <td><button id="expireTriggerBtn" onclick="expireTrigger(${trigger.triggerId})" >Expire Trigger</button></td>
+ <td>${trigger.getStatus()}</td>
+ <!--td><button id="expireTriggerBtn" onclick="expireTrigger(${trigger.triggerId})" >Expire Trigger</button></td-->
</tr>
#end
#else
src/web/js/azkaban.scheduled.view.js 4(+0 -4)
diff --git a/src/web/js/azkaban.scheduled.view.js b/src/web/js/azkaban.scheduled.view.js
index f507dc1..6d8c039 100644
--- a/src/web/js/azkaban.scheduled.view.js
+++ b/src/web/js/azkaban.scheduled.view.js
@@ -107,10 +107,6 @@ azkaban.ChangeSlaView = Backbone.View.extend({
indexToText[i] = "job " + allJobNames[i-1];
}
-
-
-
-
// populate with existing settings
if(data.settings) {
diff --git a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
index 6365fd0..0d746d8 100644
--- a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
+++ b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
@@ -113,12 +113,12 @@ public class JdbcScheduleLoaderTest {
flowOptions.setDisabledJobs(disabledJobs);
- Schedule s1 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
- Schedule s2 = new Schedule(-1, 1, "proj1", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
- Schedule s3 = new Schedule(-1, 2, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
- Schedule s4 = new Schedule(-1, 3, "proj2", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
- Schedule s5 = new Schedule(-1, 3, "proj2", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
- Schedule s6 = new Schedule(-1, 3, "proj2", "flow3", "error", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
+ Schedule s1 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions, null);
+ Schedule s2 = new Schedule(-1, 1, "proj1", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions, null);
+ Schedule s3 = new Schedule(-1, 2, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions, null);
+ Schedule s4 = new Schedule(-1, 3, "proj2", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions, null);
+ Schedule s5 = new Schedule(-1, 3, "proj2", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions, null);
+ Schedule s6 = new Schedule(-1, 3, "proj2", "flow3", "error", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions, null);
loader.insertSchedule(s1);
loader.insertSchedule(s2);
@@ -163,13 +163,13 @@ public class JdbcScheduleLoaderTest {
flowOptions.setDisabledJobs(disabledJobs);
System.out.println("the flow options are " + flowOptions);
- Schedule s1 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
+ Schedule s1 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions, null);
loader.insertSchedule(s1);
emails.add("email3");
- Schedule s2 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11112, "America/Los_Angeles", "2M", 22223, 33334, 44445, "azkaban", flowOptions);
+ Schedule s2 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11112, "America/Los_Angeles", "2M", 22223, 33334, 44445, "azkaban", flowOptions, null);
loader.updateSchedule(s2);
@@ -208,7 +208,7 @@ public class JdbcScheduleLoaderTest {
flowOptions.setFailureEmails(emails);
flowOptions.setDisabledJobs(disabledJobs);
- Schedule s = new Schedule(-1, i+1, "proj"+(i+1), "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
+ Schedule s = new Schedule(-1, i+1, "proj"+(i+1), "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions, null);
schedules.add(s);
try {
loader.insertSchedule(s);
diff --git a/unit/java/azkaban/test/trigger/DummyTriggerAction.java b/unit/java/azkaban/test/trigger/DummyTriggerAction.java
index a99b3f7..cffbed6 100644
--- a/unit/java/azkaban/test/trigger/DummyTriggerAction.java
+++ b/unit/java/azkaban/test/trigger/DummyTriggerAction.java
@@ -1,5 +1,7 @@
package azkaban.test.trigger;
+import java.util.Map;
+
import azkaban.trigger.TriggerAction;
public class DummyTriggerAction implements TriggerAction{
@@ -40,4 +42,15 @@ public class DummyTriggerAction implements TriggerAction{
return "this is real dummy action";
}
+ @Override
+ public String getId() {
+ return null;
+ }
+
+ @Override
+ public void setContext(Map<String, Object> context) {
+ // TODO Auto-generated method stub
+
+ }
+
}
diff --git a/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java b/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
index 962884c..4f1bb09 100644
--- a/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
+++ b/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
@@ -27,7 +27,7 @@ public class ExecuteFlowActionTest {
List<String> disabledJobs = new ArrayList<String>();
options.setDisabledJobs(disabledJobs);
- ExecuteFlowAction executeFlowAction = new ExecuteFlowAction(1, "testproject", "testflow", "azkaban", options);
+ ExecuteFlowAction executeFlowAction = new ExecuteFlowAction("ExecuteFlowAction", 1, "testproject", "testflow", "azkaban", options, null);
Object obj = executeFlowAction.toJson();
diff --git a/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java b/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
index 98bd5ae..9a06f56 100644
--- a/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
+++ b/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
@@ -194,7 +194,7 @@ public class JdbcTriggerLoaderTest {
Condition triggerCond = new Condition(checkers1, expr1);
Condition expireCond = new Condition(checkers1, expr1);
List<TriggerAction> actions = new ArrayList<TriggerAction>();
- TriggerAction action = new ExecuteFlowAction(1, projName, flowName, "azkaban", new ExecutionOptions());
+ TriggerAction action = new ExecuteFlowAction("executeAction", 1, projName, flowName, "azkaban", new ExecutionOptions(), null);
actions.add(action);
Trigger t = new Trigger(now, now, "azkaban", source, triggerCond, expireCond, actions);
return t;
diff --git a/unit/java/azkaban/test/trigger/ThresholdChecker.java b/unit/java/azkaban/test/trigger/ThresholdChecker.java
index 801a134..c25f566 100644
--- a/unit/java/azkaban/test/trigger/ThresholdChecker.java
+++ b/unit/java/azkaban/test/trigger/ThresholdChecker.java
@@ -1,5 +1,7 @@
package azkaban.test.trigger;
+import java.util.Map;
+
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@@ -87,5 +89,11 @@ public class ThresholdChecker implements ConditionChecker{
}
+ @Override
+ public void setContext(Map<String, Object> context) {
+ // TODO Auto-generated method stub
+
+ }
+
}
diff --git a/unit/java/azkaban/test/trigger/TriggerTest.java b/unit/java/azkaban/test/trigger/TriggerTest.java
index 8629edd..790fcb4 100644
--- a/unit/java/azkaban/test/trigger/TriggerTest.java
+++ b/unit/java/azkaban/test/trigger/TriggerTest.java
@@ -52,7 +52,7 @@ public class TriggerTest {
Condition triggerCond = new Condition(checkers1, expr1);
Condition expireCond = new Condition(checkers1, expr1);
List<TriggerAction> actions = new ArrayList<TriggerAction>();
- TriggerAction action = new ExecuteFlowAction(1, "testProj", "testFlow", "azkaban", new ExecutionOptions());
+ TriggerAction action = new ExecuteFlowAction("executeAction", 1, "testProj", "testFlow", "azkaban", new ExecutionOptions(), null);
actions.add(action);
Trigger t = new Trigger(now, now, "azkaban", "test", triggerCond, expireCond, actions);