azkaban-uncached
Changes
src/java/azkaban/scheduler/Schedule.java 260(+225 -35)
src/java/azkaban/scheduler/ScheduleManager.java 98(+52 -46)
src/java/azkaban/sla/JdbcSLALoader.java 330(+330 -0)
src/java/azkaban/sla/SLA.java 252(+252 -0)
src/java/azkaban/sla/SLALoader.java 14(+14 -0)
src/java/azkaban/sla/SlaMailer.java 228(+228 -0)
src/java/azkaban/sla/SLAManager.java 467(+467 -0)
src/java/azkaban/webapp/servlet/ScheduleServlet.java 437(+289 -148)
src/java/azkaban/webapp/servlet/velocity/flowpage.vm 183(+23 -160)
src/sql/create_sla_table.sql 10(+10 -0)
src/web/css/azkaban.css 280(+275 -5)
src/web/js/azkaban.exflow.options.view.js 453(+16 -437)
src/web/js/azkaban.flow.view.js 85(+2 -83)
src/web/js/azkaban.schedule.options.view.js 586(+586 -0)
src/web/js/azkaban.scheduled.view.js 260(+164 -96)
unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java 164(+106 -58)
unit/java/azkaban/test/sla/JdbcSLALoaderTest.java 182(+182 -0)
Details
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index fe807d3..dcef504 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -569,7 +569,7 @@ public class ExecutorManager {
return flow;
}
- private boolean isFinished(ExecutableFlow flow) {
+ public boolean isFinished(ExecutableFlow flow) {
switch(flow.getStatus()) {
case SUCCEEDED:
case FAILED:
diff --git a/src/java/azkaban/scheduler/JdbcScheduleLoader.java b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
index 976543a..ee9c812 100644
--- a/src/java/azkaban/scheduler/JdbcScheduleLoader.java
+++ b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
@@ -38,6 +38,8 @@ import org.joda.time.ReadablePeriod;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
+import azkaban.scheduler.Schedule.FlowOptions;
+import azkaban.scheduler.Schedule.SlaOptions;
import azkaban.sla.SLA;
import azkaban.sla.SLAManagerException;
import azkaban.sla.JdbcSLALoader.EncodingType;
@@ -200,7 +202,7 @@ public class JdbcScheduleLoader implements ScheduleLoader {
public void insertSchedule(Schedule s, EncodingType encType) throws ScheduleManagerException {
- String json = JSONUtils.toJSON(s.optionToObject());
+ String json = JSONUtils.toJSON(s.optionsToObject());
byte[] data = null;
try {
byte[] stringData = json.getBytes("UTF-8");
@@ -212,7 +214,7 @@ public class JdbcScheduleLoader implements ScheduleLoader {
logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
}
catch (IOException e) {
- throw new ScheduleManagerException("Error encoding the schedule options" + s.getSchedOptions());
+ throw new ScheduleManagerException("Error encoding the schedule options. " + s.getScheduleName());
}
QueryRunner runner = new QueryRunner(dataSource);
@@ -262,7 +264,7 @@ public class JdbcScheduleLoader implements ScheduleLoader {
public void updateSchedule(Schedule s, EncodingType encType) throws ScheduleManagerException {
- String json = JSONUtils.toJSON(s.optionToObject());
+ String json = JSONUtils.toJSON(s.optionsToObject());
byte[] data = null;
try {
byte[] stringData = json.getBytes("UTF-8");
@@ -274,7 +276,7 @@ public class JdbcScheduleLoader implements ScheduleLoader {
logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
}
catch (IOException e) {
- throw new ScheduleManagerException("Error encoding the schedule options" + s.getSchedOptions());
+ throw new ScheduleManagerException("Error encoding the schedule options " + s.getScheduleName());
}
QueryRunner runner = new QueryRunner(dataSource);
@@ -326,7 +328,8 @@ public class JdbcScheduleLoader implements ScheduleLoader {
int encodingType = rs.getInt(12);
byte[] data = rs.getBytes(13);
- Map<String, Object> options = null;
+ FlowOptions flowOptions = null;
+ SlaOptions slaOptions = null;
if (data != null) {
EncodingType encType = EncodingType.fromInteger(encodingType);
Object optsObj;
@@ -341,13 +344,14 @@ public class JdbcScheduleLoader implements ScheduleLoader {
String jsonString = new String(data, "UTF-8");
optsObj = JSONUtils.parseJSONFromString(jsonString);
}
- options = Schedule.createScheduleOptionFromObject(optsObj);
+ flowOptions = Schedule.createFlowOptionFromObject(optsObj);
+ slaOptions = Schedule.createSlaOptionFromObject(optsObj);
} catch (IOException e) {
throw new SQLException("Error reconstructing schedule options " + projectName + "." + flowName);
}
}
- Schedule s = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, options);
+ Schedule s = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, flowOptions, slaOptions);
schedules.add(s);
} while (rs.next());
src/java/azkaban/scheduler/Schedule.java 260(+225 -35)
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index 98b6973..1c80c58 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -16,8 +16,6 @@
package azkaban.scheduler;
-
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -34,11 +32,153 @@ import org.joda.time.ReadablePeriod;
import org.joda.time.Seconds;
import org.joda.time.Weeks;
+import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.scheduler.Schedule.FlowOptions;
+import azkaban.scheduler.Schedule.SlaOptions;
+import azkaban.sla.SLA.SlaSetting;
import azkaban.utils.Pair;
public class Schedule{
+
+ public static class FlowOptions {
+
+ public List<String> getFailureEmails() {
+ return failureEmails;
+ }
+ public void setFailureEmails(List<String> failureEmails) {
+ this.failureEmails = failureEmails;
+ }
+ public List<String> getSuccessEmails() {
+ return successEmails;
+ }
+ public void setSuccessEmails(List<String> successEmails) {
+ this.successEmails = successEmails;
+ }
+ public FailureAction getFailureAction() {
+ return failureAction;
+ }
+ public void setFailureAction(FailureAction failureAction) {
+ this.failureAction = failureAction;
+ }
+ public boolean isnotifyOnFirstFailure() {
+ return notifyOnFirstFailure;
+ }
+ public void setNotifyOnFirstFailure(boolean notifyOnFirstFailure) {
+ this.notifyOnFirstFailure = notifyOnFirstFailure;
+ }
+ public boolean isnotifyOnLastFailure() {
+ return notifyOnLastFailure;
+ }
+ public void setNotifyOnLastFailure(boolean notifyOnLastFailure) {
+ this.notifyOnLastFailure = notifyOnLastFailure;
+ }
+ public Map<String, String> getFlowOverride() {
+ return flowOverride;
+ }
+ public void setFlowOverride(Map<String, String> flowOverride) {
+ this.flowOverride = flowOverride;
+ }
+ public List<String> getDisabledJobs() {
+ return disabledJobs;
+ }
+ public void setDisabledJobs(List<String> disabledJobs) {
+ this.disabledJobs = disabledJobs;
+ }
+ private List<String> failureEmails;
+ private List<String> successEmails;
+ private FailureAction failureAction = FailureAction.FINISH_CURRENTLY_RUNNING;
+ private boolean notifyOnFirstFailure;
+ private boolean notifyOnLastFailure;
+ Map<String, String> flowOverride;
+ private List<String> disabledJobs;
+ public Object toObject() {
+ Map<String, Object> obj = new HashMap<String, Object>();
+ obj.put("failureEmails", failureEmails);
+ obj.put("successEmails", successEmails);
+ obj.put("failureAction", failureAction.toString());
+ obj.put("notifyOnFirstFailure", notifyOnFirstFailure);
+ obj.put("notifyOnLastFailure", notifyOnLastFailure);
+ obj.put("flowOverride", flowOverride);
+ obj.put("disabledJobs", disabledJobs);
+ return obj;
+ }
+ @SuppressWarnings("unchecked")
+ public static FlowOptions fromObject(Object object) {
+ if(object != null) {
+ FlowOptions flowOptions = new FlowOptions();
+ Map<String, Object> obj = (HashMap<String, Object>) object;
+ if(obj.containsKey("failureEmails")) {
+ flowOptions.setFailureEmails((List<String>) obj.get("failureEmails"));
+ }
+ if(obj.containsKey("successEmails")) {
+ flowOptions.setSuccessEmails((List<String>) obj.get("SuccessEmails"));
+ }
+ if(obj.containsKey("failureAction")) {
+ flowOptions.setFailureAction(FailureAction.valueOf((String)obj.get("failureAction")));
+ }
+ if(obj.containsKey("notifyOnFirstFailure")) {
+ flowOptions.setNotifyOnFirstFailure((Boolean)obj.get("notifyOnFirstFailure"));
+ }
+ if(obj.containsKey("notifyOnLastFailure")) {
+ flowOptions.setNotifyOnFirstFailure((Boolean)obj.get("notifyOnLastFailure"));
+ }
+ if(obj.containsKey("flowOverride")) {
+ flowOptions.setFlowOverride((Map<String, String>) obj.get("flowOverride"));
+ }
+ if(obj.containsKey("disabledJobs")) {
+ flowOptions.setDisabledJobs((List<String>) obj.get("disabledJobs"));
+ }
+ return flowOptions;
+ }
+ return null;
+ }
+ }
+
+ public static class SlaOptions {
+ public List<String> getSlaEmails() {
+ return slaEmails;
+ }
+ public void setSlaEmails(List<String> slaEmails) {
+ this.slaEmails = slaEmails;
+ }
+ public List<SlaSetting> getSettings() {
+ return settings;
+ }
+ public void setSettings(List<SlaSetting> settings) {
+ this.settings = settings;
+ }
+ private List<String> slaEmails;
+ private List<SlaSetting> settings;
+ public Object toObject() {
+ Map<String, Object> obj = new HashMap<String, Object>();
+ obj.put("slaEmails", slaEmails);
+ List<Object> slaSettings = new ArrayList<Object>();
+ for(SlaSetting s : settings) {
+ slaSettings.add(s.toObject());
+ }
+ obj.put("settings", slaSettings);
+ return obj;
+ }
+ @SuppressWarnings("unchecked")
+ public static SlaOptions fromObject(Object object) {
+ if(object != null) {
+ SlaOptions slaOptions = new SlaOptions();
+ Map<String, Object> obj = (HashMap<String, Object>) object;
+ slaOptions.setSlaEmails((List<String>) obj.get("slaEmails"));
+ List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
+ for(Object set: (List<Object>)obj.get("settings")) {
+ slaSets.add(SlaSetting.fromObject(set));
+ }
+ slaOptions.setSettings(slaSets);
+ return slaOptions;
+ }
+ return null;
+ }
+
+ }
+
// private long projectGuid;
// private long flowGuid;
@@ -55,7 +195,9 @@ public class Schedule{
private String submitUser;
private String status;
private long submitTime;
- private Map<String, Object> schedOptions;
+
+ private FlowOptions flowOptions;
+ private SlaOptions slaOptions;
public Schedule(
int projectId,
@@ -68,8 +210,7 @@ public class Schedule{
long lastModifyTime,
long nextExecTime,
long submitTime,
- String submitUser,
- Map<String, Object> schedOptions
+ String submitUser
) {
this.projectId = projectId;
this.projectName = projectName;
@@ -82,7 +223,8 @@ public class Schedule{
this.submitUser = submitUser;
this.status = status;
this.submitTime = submitTime;
- this.schedOptions = schedOptions;
+ this.flowOptions = null;
+ this.slaOptions = null;
}
public Schedule(
@@ -97,7 +239,8 @@ public class Schedule{
long nextExecTime,
long submitTime,
String submitUser,
- Map<String, Object> schedOptions
+ FlowOptions flowOptions,
+ SlaOptions slaOptions
) {
this.projectId = projectId;
this.projectName = projectName;
@@ -110,15 +253,54 @@ public class Schedule{
this.submitUser = submitUser;
this.status = status;
this.submitTime = submitTime;
- this.schedOptions = schedOptions;
+ this.flowOptions = flowOptions;
+ this.slaOptions = slaOptions;
+ }
+
+ public Schedule(
+ int projectId,
+ String projectName,
+ String flowName,
+ String status,
+ long firstSchedTime,
+ DateTimeZone timezone,
+ ReadablePeriod period,
+ long lastModifyTime,
+ long nextExecTime,
+ long submitTime,
+ String submitUser,
+ FlowOptions flowOptions,
+ SlaOptions slaOptions
+ ) {
+ this.projectId = projectId;
+ this.projectName = projectName;
+ this.flowName = flowName;
+ this.firstSchedTime = firstSchedTime;
+ this.timezone = timezone;
+ this.lastModifyTime = lastModifyTime;
+ this.period = period;
+ this.nextExecTime = nextExecTime;
+ this.submitUser = submitUser;
+ this.status = status;
+ this.submitTime = submitTime;
+ this.flowOptions = flowOptions;
+ this.slaOptions = slaOptions;
}
- public Map<String, Object> getSchedOptions() {
- return schedOptions;
+ public FlowOptions getFlowOptions() {
+ return flowOptions;
}
- public void setSchedOptions(Map<String, Object> schedOptions) {
- this.schedOptions = schedOptions;
+ public void setFlowOptions(FlowOptions flowOptions) {
+ this.flowOptions = flowOptions;
+ }
+
+ public SlaOptions getSlaOptions() {
+ return slaOptions;
+ }
+
+ public void setSlaOptions(SlaOptions slaOptions) {
+ this.slaOptions = slaOptions;
}
public String getScheduleName() {
@@ -277,34 +459,42 @@ public class Schedule{
}
- public Map<String,Object> optionToObject() {
- //HashMap<String, Object> optionObject = new HashMap<String, Object>();
-
-
- return schedOptions;
+ public Map<String,Object> optionsToObject() {
+ if(flowOptions != null || slaOptions != null) {
+ HashMap<String, Object> schedObj = new HashMap<String, Object>();
+
+ if(flowOptions != null) {
+ schedObj.put("flowOptions", flowOptions.toObject());
+ }
+ if(slaOptions != null) {
+ schedObj.put("slaOptions", slaOptions.toObject());
+ }
+
+ return schedObj;
+ }
+ return null;
}
@SuppressWarnings("unchecked")
- public static Map<String, Object> createScheduleOptionFromObject(Object obj) {
- Map<String, Object> options = new HashMap<String, Object>();
+ public static FlowOptions createFlowOptionFromObject(Object obj) {
if(obj != null) {
- HashMap<String, Object> optionObject = (HashMap<String,Object>)obj;
- options.putAll(optionObject);
- return options;
- }
- else {
- return new HashMap<String, Object>();
- }
+ Map<String, Object> options = (HashMap<String, Object>) obj;
+ if(options.containsKey("flowOptions")) {
+ return FlowOptions.fromObject(options.get("flowOptions"));
+ }
+ }
+ return null;
}
-
+
@SuppressWarnings("unchecked")
- public List<String> getDisabledJobs() {
- if (schedOptions.containsKey("disabled")) {
- return (List<String>) schedOptions.get("disabled");
- }
- else {
- return new ArrayList<String>();
- }
+ public static SlaOptions createSlaOptionFromObject(Object obj) {
+ if(obj != null) {
+ Map<String, Object> options = (HashMap<String, Object>) obj;
+ if(options.containsKey("slaOptions")) {
+ return SlaOptions.fromObject(options.get("slaOptions"));
+ }
+ }
+ return null;
}
-
+
}
\ No newline at end of file
src/java/azkaban/scheduler/ScheduleManager.java 98(+52 -46)
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 9c3c459..ad67def 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -44,6 +44,12 @@ import azkaban.project.Project;
import azkaban.project.ProjectManager;
+import azkaban.scheduler.Schedule.FlowOptions;
+import azkaban.scheduler.Schedule.SlaOptions;
+import azkaban.sla.SLA.SlaAction;
+import azkaban.sla.SLA.SlaRule;
+import azkaban.sla.SLAManager;
+import azkaban.sla.SLA.SlaSetting;
import azkaban.utils.Pair;
@@ -63,6 +69,7 @@ public class ScheduleManager {
private final ScheduleRunner runner;
private final ExecutorManager executorManager;
private final ProjectManager projectManager;
+ private final SLAManager slaManager;
/**
* Give the schedule manager a loader class that will properly load the
@@ -72,10 +79,12 @@ public class ScheduleManager {
*/
public ScheduleManager(ExecutorManager executorManager,
ProjectManager projectManager,
+ SLAManager slaManager,
ScheduleLoader loader)
{
this.executorManager = executorManager;
this.projectManager = projectManager;
+ this.slaManager = slaManager;
this.loader = loader;
this.runner = new ScheduleRunner();
@@ -173,9 +182,10 @@ public class ScheduleManager {
final long nextExecTime,
final long submitTime,
final String submitUser,
- final Map<String, Object> options
+ final FlowOptions flowOptions,
+ final SlaOptions slaOptions
) {
- Schedule sched = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, options);
+ Schedule sched = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, flowOptions, slaOptions);
logger.info("Scheduling flow '" + sched.getScheduleName() + "' for "
+ _dateFormat.print(firstSchedTime) + " with a period of "
+ period == null ? "(non-recurring)" : period);
@@ -367,57 +377,32 @@ public class ScheduleManager {
ExecutableFlow exflow = new ExecutableFlow(flow);
exflow.setSubmitUser(runningSched.getSubmitUser());
- Map<String, Object> scheduleOptions = runningSched.getSchedOptions();
+ FlowOptions flowOptions = runningSched.getFlowOptions();
- if(scheduleOptions != null && scheduleOptions.containsKey("flowOptions")) {
- Map<String, Object> flowOptions = (Map<String, Object>) scheduleOptions.get("flowOptions");
-
- if (flowOptions.containsKey("failureAction")) {
- String option = (String) flowOptions.get("failureAction");
- if (option.equals("finishCurrent") ) {
- exflow.setFailureAction(FailureAction.FINISH_CURRENTLY_RUNNING);
- }
- else if (option.equals("cancelImmediately")) {
- exflow.setFailureAction(FailureAction.CANCEL_ALL);
- }
- else if (option.equals("finishPossible")) {
- exflow.setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
- }
- }
-
- if (flowOptions.containsKey("failureEmails")) {
- String emails = (String) flowOptions.get("failureEmails");
- String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
- exflow.setFailureEmails(Arrays.asList(emailSplit));
- }
- if (flowOptions.containsKey("successEmails")) {
- String emails = (String) flowOptions.get("successEmails");
- String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
- exflow.setSuccessEmails(Arrays.asList(emailSplit));
- }
- if (flowOptions.containsKey("notifyFailureFirst")) {
- exflow.setNotifyOnFirstFailure(Boolean.parseBoolean((String)flowOptions.get("notifyFailureFirst")));
+ if(flowOptions != null) {
+ if (flowOptions.getFailureAction() != null) {
+ exflow.setFailureAction(flowOptions.getFailureAction());
}
- if (flowOptions.containsKey("notifyFailureLast")) {
- exflow.setNotifyOnLastFailure(Boolean.parseBoolean((String)flowOptions.get("notifyFailureLast")));
+ if (flowOptions.getFailureEmails() != null) {
+ exflow.setFailureEmails(flowOptions.getFailureEmails());
}
- if (flowOptions.containsKey("executingJobOption")) {
- String option = (String)flowOptions.get("jobOption");
- // Not set yet
+ if (flowOptions.getSuccessEmails() != null) {
+ exflow.setSuccessEmails(flowOptions.getSuccessEmails());
}
+ exflow.setNotifyOnFirstFailure(flowOptions.isnotifyOnFirstFailure());
+ exflow.setNotifyOnLastFailure(flowOptions.isnotifyOnLastFailure());
- Map<String, String> flowParamGroup = this.getParamGroup(req, "flowOverride");
- exflow.addFlowParameters(flowParamGroup);
+ exflow.addFlowParameters(flowOptions.getFlowOverride());
+ List<String> disabled = flowOptions.getDisabledJobs();
// Setup disabled
- Map<String, String> paramGroup = this.getParamGroup(req, "disable");
- for (Map.Entry<String, String> entry: paramGroup.entrySet()) {
- boolean nodeDisabled = Boolean.parseBoolean(entry.getValue());
- exflow.setStatus(entry.getKey(), nodeDisabled ? Status.DISABLED : Status.READY);
+ if(disabled != null) {
+ for (String job : disabled) {
+ exflow.setNodeStatus(job, Status.DISABLED);
+ }
}
-
}
-
+
try {
executorManager.submitExecutableFlow(exflow);
logger.info("Scheduler has invoked " + exflow.getExecutionId());
@@ -426,9 +411,30 @@ public class ScheduleManager {
logger.error(e.getMessage());
return;
}
- } catch (JobExecutionException e) {
- logger.info("Could not run flow. " + e.getMessage());
+
+ SlaOptions slaOptions = runningSched.getSlaOptions();
+
+ if(slaOptions != null) {
+ // submit flow slas
+ List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
+ for(SlaSetting set : slaOptions.getSettings()) {
+ if(set.getId().equals("")) {
+ DateTime checkTime = new DateTime(runningSched.getNextExecTime()).plus(set.getDuration());
+ slaManager.submitSla(exflow.getExecutionId(), "", checkTime, slaOptions.getSlaEmails(), set.getActions(), null, set.getRule());
+ }
+ else {
+ jobsettings.add(set);
+ }
+ }
+ if(jobsettings.size() > 0) {
+ slaManager.submitSla(exflow.getExecutionId(), "", DateTime.now(), slaOptions.getSlaEmails(), new ArrayList<SlaAction>(), jobsettings, SlaRule.WAITANDCHECKJOB);
+ }
+ }
+
+ } catch (Exception e) {
+ logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
}
+
}
removeRunnerSchedule(runningSched);
src/java/azkaban/sla/JdbcSLALoader.java 330(+330 -0)
diff --git a/src/java/azkaban/sla/JdbcSLALoader.java b/src/java/azkaban/sla/JdbcSLALoader.java
new file mode 100644
index 0000000..cf6cd2f
--- /dev/null
+++ b/src/java/azkaban/sla/JdbcSLALoader.java
@@ -0,0 +1,330 @@
+package azkaban.sla;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.ReadablePeriod;
+
+import azkaban.scheduler.ScheduleManagerException;
+import azkaban.sla.SLA.SlaRule;
+import azkaban.utils.DataSourceUtils;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
+
+public class JdbcSLALoader implements SLALoader {
+
+ private static final Logger logger = Logger.getLogger(JdbcSLALoader.class);
+
+ /**
+ * Used for when we store text data. Plain uses UTF8 encoding.
+ */
+ public static enum EncodingType {
+ PLAIN(1), GZIP(2);
+
+ private int numVal;
+
+ EncodingType(int numVal) {
+ this.numVal = numVal;
+ }
+
+ public int getNumVal() {
+ return numVal;
+ }
+
+ public static EncodingType fromInteger(int x) {
+ switch (x) {
+ case 1:
+ return PLAIN;
+ case 2:
+ return GZIP;
+ default:
+ return PLAIN;
+ }
+ }
+ }
+
+ private DataSource dataSource;
+ private EncodingType defaultEncodingType = EncodingType.GZIP;
+
+ private static String slaTblName = "active_sla";
+
+ final private static String LOAD_ALL_SLA =
+ "SELECT exec_id, job_name, check_time, rule, enc_type, options FROM " + slaTblName;
+
+ final private static String INSERT_SLA =
+ "INSERT INTO " + slaTblName + " ( exec_id, job_name, check_time, rule, enc_type, options) values (?,?,?,?,?,?)";
+
+// final private static String UPDATE_SLA =
+// "UPDATE " + slaTblName + " SET exec_id, job_name, check_time, rule, enc_type, options) values (?,?,?,?,?,?)";
+//
+ // use "" for flow
+ final private static String REMOVE_SLA =
+ "DELETE FROM " + slaTblName + " WHERE exec_id=? AND job_name=? AND check_time=? AND rule=?";
+
+ public JdbcSLALoader(Props props) {
+ String databaseType = props.getString("database.type");
+
+ if (databaseType.equals("mysql")) {
+ int port = props.getInt("mysql.port");
+ String host = props.getString("mysql.host");
+ String database = props.getString("mysql.database");
+ String user = props.getString("mysql.user");
+ String password = props.getString("mysql.password");
+ int numConnections = props.getInt("mysql.numconnections");
+
+ dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
+ }
+ }
+
+ private Connection getConnection() throws SLAManagerException {
+ Connection connection = null;
+ try {
+ connection = dataSource.getConnection();
+ } catch (Exception e) {
+ DbUtils.closeQuietly(connection);
+ throw new SLAManagerException("Error getting DB connection.", e);
+ }
+ return connection;
+ }
+
+ public EncodingType getDefaultEncodingType() {
+ return defaultEncodingType;
+ }
+
+ public void setDefaultEncodingType(EncodingType defaultEncodingType) {
+ this.defaultEncodingType = defaultEncodingType;
+ }
+
+ @Override
+ public List<SLA> loadSLAs() throws SLAManagerException {
+ logger.info("Loading all SLAs from db.");
+
+ Connection connection = getConnection();
+ List<SLA> SLAs;
+ try{
+ SLAs= loadSLAs(connection, defaultEncodingType);
+ }
+ catch(SLAManagerException e) {
+ throw new SLAManagerException("Failed to load SLAs" + e.getCause());
+ }
+ finally{
+ DbUtils.closeQuietly(connection);
+ }
+
+ logger.info("Loaded " + SLAs.size() + " SLAs.");
+
+ return SLAs;
+
+ }
+
+ public List<SLA> loadSLAs(Connection connection, EncodingType encType) throws SLAManagerException {
+ logger.info("Loading all SLAs from db.");
+
+ QueryRunner runner = new QueryRunner();
+ ResultSetHandler<List<SLA>> handler = new SLAResultHandler();
+ List<SLA> SLAs;
+
+ try {
+ SLAs = runner.query(connection, LOAD_ALL_SLA, handler);
+ } catch (SQLException e) {
+ logger.error(LOAD_ALL_SLA + " failed.");
+ throw new SLAManagerException("Load SLAs from db failed. "+ e.getCause());
+ }
+
+ return SLAs;
+
+ }
+
+ @Override
+ public void removeSLA(SLA s) throws SLAManagerException {
+ Connection connection = getConnection();
+ try{
+ removeSLA(connection, s);
+ }
+ catch(SLAManagerException e) {
+ logger.error(LOAD_ALL_SLA + " failed.");
+ throw new SLAManagerException("Load SLAs from db failed. "+ e.getCause());
+ }
+ finally {
+ DbUtils.closeQuietly(connection);
+ }
+
+ }
+
+ private void removeSLA(Connection connection, SLA s) throws SLAManagerException {
+
+ logger.info("Removing SLA " + s.toString() + " from db.");
+
+ QueryRunner runner = new QueryRunner(dataSource);
+
+ try {
+ int removes = runner.update(REMOVE_SLA, s.getExecId(), s.getJobName(), s.getCheckTime().getMillis(), s.getRule().getNumVal());
+ if (removes == 0) {
+ throw new SLAManagerException("No schedule has been removed.");
+ }
+ } catch (SQLException e) {
+ logger.error("Remove SLA failed. " + s.toString());
+ throw new SLAManagerException("Remove SLA " + s.toString() + " from db failed. "+ e.getCause());
+ }
+
+ }
+
+ @Override
+ public void insertSLA(SLA s) throws SLAManagerException {
+ Connection connection = getConnection();
+ try{
+ insertSLA(connection, s, defaultEncodingType);
+ }
+ catch(SLAManagerException e){
+ logger.error("Insert SLA failed. " + s.toString());
+ throw new SLAManagerException("Insert SLA " + s.toString() + " into db failed. "+ e.getCause() + e.getMessage() + e.getStackTrace());
+ }
+ finally {
+ DbUtils.closeQuietly(connection);
+ }
+ }
+
+ private void insertSLA(Connection connection, SLA s, EncodingType encType) throws SLAManagerException {
+
+ logger.debug("Inserting new SLA into DB. " + s.toString());
+
+ QueryRunner runner = new QueryRunner();
+
+ String json = JSONUtils.toJSON(s.optionToObject());
+ byte[] data = null;
+ try {
+ byte[] stringData = json.getBytes("UTF-8");
+ data = stringData;
+
+ if (encType == EncodingType.GZIP) {
+ data = GZIPUtils.gzipBytes(stringData);
+ }
+ logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+ }
+ catch (IOException e) {
+ throw new SLAManagerException("Error encoding the sla options.");
+ }
+
+ try {
+ int inserts = runner.update(connection, INSERT_SLA, s.getExecId(), s.getJobName(), s.getCheckTime().getMillis(), s.getRule().getNumVal(), encType.getNumVal(), data);
+
+ if (inserts == 0) {
+ throw new SLAManagerException("No SLA has been inserted. Insertion failed.");
+ }
+ } catch (SQLException e) {
+ logger.error("Insert SLA failed.");
+ throw new SLAManagerException("Insert sla " + s.toString() + " into db failed. " + e.getCause());
+ }
+
+ }
+
+// @Override
+// public void updateSLA(SLA s) throws SLAManagerException {
+// Connection connection = getConnection();
+// try{
+// insertSLA(connection, s, defaultEncodingType);
+// }
+// catch(SLAManagerException e){
+// logger.error("Insert SLA failed. " + s.toString());
+// throw new SLAManagerException("Insert SLA " + s.toString() + " into db failed. "+ e.getCause() + e.getMessage() + e.getStackTrace());
+// }
+// finally {
+// DbUtils.closeQuietly(connection);
+// }
+// }
+//
+// private void updateSLA(Connection connection, SLA s, EncodingType encType) throws SLAManagerException {
+//
+// logger.debug("Inserting new SLA into DB. " + s.toString());
+//
+// QueryRunner runner = new QueryRunner();
+//
+// String json = JSONUtils.toJSON(s.optionToObject());
+// byte[] data = null;
+// try {
+// byte[] stringData = json.getBytes("UTF-8");
+// data = stringData;
+//
+// if (encType == EncodingType.GZIP) {
+// data = GZIPUtils.gzipBytes(stringData);
+// }
+// logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+// }
+// catch (IOException e) {
+// throw new SLAManagerException("Error encoding the sla options.");
+// }
+//
+// try {
+// int inserts = runner.update(connection, INSERT_SLA, s.getExecId(), s.getJobName(), s.getCheckTime().getMillis(), s.getRule().getNumVal(), encType.getNumVal(), data);
+//
+// if (inserts == 0) {
+// throw new SLAManagerException("No SLA has been inserted. Insertion failed.");
+// }
+// } catch (SQLException e) {
+// logger.error("Insert SLA failed.");
+// throw new SLAManagerException("Insert sla " + s.toString() + " into db failed. " + e.getCause());
+// }
+//
+// }
+
+ public class SLAResultHandler implements ResultSetHandler<List<SLA>> {
+ @Override
+ public List<SLA> handle(ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.<SLA>emptyList();
+ }
+
+ ArrayList<SLA> SLAs = new ArrayList<SLA>();
+
+ do {
+ int execId = rs.getInt(1);
+ String jobName = rs.getString(2);
+ DateTime checkTime = new DateTime(rs.getLong(3));
+ SlaRule rule = SlaRule.fromInteger(rs.getInt(4));
+ int encodingType = rs.getInt(5);
+ byte[] data = rs.getBytes(6);
+
+ SLA s = null;
+
+ EncodingType encType = EncodingType.fromInteger(encodingType);
+ Object optsObj;
+ try {
+ // Convoluted way to inflate strings. Should find common package or helper function.
+ if (encType == EncodingType.GZIP) {
+ // Decompress the sucker.
+ String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+ optsObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+ else {
+ String jsonString = new String(data, "UTF-8");
+ optsObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+ s = SLA.createSlaFromObject(execId, jobName, checkTime, rule, optsObj);
+ } catch (IOException e) {
+ throw new SQLException("Error reconstructing SLA options. " + execId + " " + jobName + " " + checkTime.toDateTimeISO() + e.getCause());
+ }
+ SLAs.add(s);
+
+ } while (rs.next());
+
+ return SLAs;
+ }
+
+ }
+
+}
src/java/azkaban/sla/SLA.java 252(+252 -0)
diff --git a/src/java/azkaban/sla/SLA.java b/src/java/azkaban/sla/SLA.java
new file mode 100644
index 0000000..eb08229
--- /dev/null
+++ b/src/java/azkaban/sla/SLA.java
@@ -0,0 +1,252 @@
+package azkaban.sla;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.joda.time.ReadablePeriod;
+
+import azkaban.scheduler.Schedule;
+
+public class SLA {
+
+ public static enum SlaRule {
+ SUCCESS(1), FINISH(2), WAITANDCHECKJOB(3);
+
+ private int numVal;
+
+ SlaRule(int numVal) {
+ this.numVal = numVal;
+ }
+
+ public int getNumVal() {
+ return numVal;
+ }
+
+ public static SlaRule fromInteger(int x) {
+ switch (x) {
+ case 1:
+ return SUCCESS;
+ case 2:
+ return FINISH;
+ case 3:
+ return WAITANDCHECKJOB;
+ default:
+ return SUCCESS;
+ }
+ }
+ }
+
+ public static enum SlaAction {
+ EMAIL(1), KILL(2);
+
+ private int numVal;
+
+ SlaAction(int numVal) {
+ this.numVal = numVal;
+ }
+
+ public int getNumVal() {
+ return numVal;
+ }
+
+ public static SlaAction fromInteger(int x) {
+ switch (x) {
+ case 1:
+ return EMAIL;
+ case 2:
+ return KILL;
+ default:
+ return EMAIL;
+ }
+ }
+ }
+
+ public static class SlaSetting {
+ public String getId() {
+ return id;
+ }
+ public void setId(String id) {
+ this.id = id;
+ }
+ public ReadablePeriod getDuration() {
+ return duration;
+ }
+ public void setDuration(ReadablePeriod duration) {
+ this.duration = duration;
+ }
+ public SlaRule getRule() {
+ return rule;
+ }
+ public void setRule(SlaRule rule) {
+ this.rule = rule;
+ }
+ public List<SlaAction> getActions() {
+ return actions;
+ }
+ public void setActions(List<SlaAction> actions) {
+ this.actions = actions;
+ }
+
+ public Object toObject() {
+ Map<String, Object> obj = new HashMap<String, Object>();
+ obj.put("id", id);
+ obj.put("duration", Schedule.createPeriodString(duration));
+// List<String> rulesObj = new ArrayList<String>();
+// for(SlaRule rule : rules) {
+// rulesObj.add(rule.toString());
+// }
+// obj.put("rules", rulesObj);
+ obj.put("rule", rule.toString());
+ List<String> actionsObj = new ArrayList<String>();
+ for(SlaAction act : actions) {
+ actionsObj.add(act.toString());
+ }
+ obj.put("actions", actionsObj);
+ return obj;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static SlaSetting fromObject(Object obj) {
+ Map<String, Object> slaObj = (HashMap<String, Object>) obj;
+ String subId = (String) slaObj.get("id");
+ ReadablePeriod dur = Schedule.parsePeriodString((String) slaObj.get("duration"));
+// List<String> rulesObj = (ArrayList<String>) slaObj.get("rules");
+// List<SlaRule> slaRules = new ArrayList<SLA.SlaRule>();
+// for(String rule : rulesObj) {
+// slaRules.add(SlaRule.valueOf(rule));
+// }
+ SlaRule slaRule = SlaRule.valueOf((String) slaObj.get("rule"));
+ List<String> actsObj = (ArrayList<String>) slaObj.get("actions");
+ List<SlaAction> slaActs = new ArrayList<SlaAction>();
+ for(String act : actsObj) {
+ slaActs.add(SlaAction.valueOf(act));
+ }
+
+ SlaSetting ret = new SlaSetting();
+ ret.setId(subId);
+ ret.setDuration(dur);
+ ret.setRule(slaRule);
+ ret.setActions(slaActs);
+ return ret;
+ }
+
+ private String id;
+ private ReadablePeriod duration;
+ private SlaRule rule = SlaRule.SUCCESS;
+ private List<SlaAction> actions;
+ }
+
+ private int execId;
+ private String jobName;
+ private DateTime checkTime;
+ private List<String> emails;
+ private List<SlaAction> actions;
+ private List<SlaSetting> jobSettings;
+ private SlaRule rule;
+
+ public SLA(
+ int execId,
+ String jobName,
+ DateTime checkTime,
+ List<String> emails,
+ List<SlaAction> slaActions,
+ List<SlaSetting> jobSettings,
+ SlaRule slaRule
+ ) {
+ this.execId = execId;
+ this.jobName = jobName;
+ this.checkTime = checkTime;
+ this.emails = emails;
+ this.actions = slaActions;
+ this.jobSettings = jobSettings;
+ this.rule = slaRule;
+ }
+
+ public int getExecId() {
+ return execId;
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ public DateTime getCheckTime() {
+ return checkTime;
+ }
+
+ public List<String> getEmails() {
+ return emails;
+ }
+
+ public List<SlaAction> getActions() {
+ return actions;
+ }
+
+ public List<SlaSetting> getJobSettings() {
+ return jobSettings;
+ }
+
+ public SlaRule getRule() {
+ return rule;
+ }
+
+ public String toString() {
+ return execId + " " + jobName + " to be checked at " + checkTime.toDateTimeISO();
+ }
+
+ public Map<String,Object> optionToObject() {
+ HashMap<String, Object> slaObj = new HashMap<String, Object>();
+
+ slaObj.put("emails", emails);
+// slaObj.put("rule", rule.toString());
+
+ List<String> actionsObj = new ArrayList<String>();
+ for(SlaAction act : actions) {
+ actionsObj.add(act.toString());
+ }
+ slaObj.put("actions", actionsObj);
+
+ if(jobSettings != null && jobSettings.size() > 0) {
+ List<Object> settingsObj = new ArrayList<Object>();
+ for(SlaSetting set : jobSettings) {
+ settingsObj.add(set.toObject());
+ }
+ slaObj.put("jobSettings", settingsObj);
+ }
+
+
+ return slaObj;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static SLA createSlaFromObject(int execId, String jobName, DateTime checkTime, SlaRule rule, Object obj) {
+
+ HashMap<String, Object> slaObj = (HashMap<String,Object>)obj;
+
+ List<String> emails = (List<String>)slaObj.get("emails");
+// SlaRule rule = SlaRule.valueOf((String)slaObj.get("rule"));
+ List<String> actsObj = (ArrayList<String>) slaObj.get("actions");
+ List<SlaAction> slaActs = new ArrayList<SlaAction>();
+ for(String act : actsObj) {
+ slaActs.add(SlaAction.valueOf(act));
+ }
+ List<SlaSetting> jobSets = null;
+ if(slaObj.containsKey("jobSettings") && slaObj.get("jobSettings") != null) {
+ jobSets = new ArrayList<SLA.SlaSetting>();
+ for(Object set : (List<Object>)slaObj.get("jobSettings")) {
+ SlaSetting jobSet = SlaSetting.fromObject(set);
+ jobSets.add(jobSet);
+ }
+ }
+
+ return new SLA(execId, jobName, checkTime, emails, slaActs, jobSets, rule);
+ }
+
+ public void setCheckTime(DateTime time) {
+ this.checkTime = time;
+ }
+
+}
src/java/azkaban/sla/SLALoader.java 14(+14 -0)
diff --git a/src/java/azkaban/sla/SLALoader.java b/src/java/azkaban/sla/SLALoader.java
new file mode 100644
index 0000000..2c32b33
--- /dev/null
+++ b/src/java/azkaban/sla/SLALoader.java
@@ -0,0 +1,14 @@
+package azkaban.sla;
+
+import java.util.List;
+
+public interface SLALoader {
+
+ public void insertSLA(SLA s) throws SLAManagerException;
+
+ public List<SLA> loadSLAs() throws SLAManagerException;
+
+ public void removeSLA(SLA s) throws SLAManagerException;
+
+// public void updateSLA(SLA s) throws SLAManagerException;
+}
src/java/azkaban/sla/SlaMailer.java 228(+228 -0)
diff --git a/src/java/azkaban/sla/SlaMailer.java b/src/java/azkaban/sla/SlaMailer.java
new file mode 100644
index 0000000..17a67d0
--- /dev/null
+++ b/src/java/azkaban/sla/SlaMailer.java
@@ -0,0 +1,228 @@
+package azkaban.sla;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.mail.MessagingException;
+
+import org.apache.log4j.Logger;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.executor.ExecutableFlow.Status;
+import azkaban.executor.ExecutableNode;
+import azkaban.sla.SLA;
+import azkaban.utils.EmailMessage;
+import azkaban.utils.Props;
+import azkaban.utils.Utils;
+
+public class SlaMailer {
+ private static Logger logger = Logger.getLogger(SlaMailer.class);
+
+ private boolean testMode = false;
+ private String clientHostname;
+ private String clientPortNumber;
+
+ private String mailHost;
+ private String mailUser;
+ private String mailPassword;
+ private String mailSender;
+ private String azkabanName;
+
+ public SlaMailer(Props props) {
+ this.azkabanName = props.getString("azkaban.name", "azkaban");
+ this.mailHost = props.getString("mail.host", "localhost");
+ this.mailUser = props.getString("mail.user", "");
+ this.mailPassword = props.getString("mail.password", "");
+ this.mailSender = props.getString("mail.sender", "");
+
+ this.clientHostname = props.getString("jetty.hostname", "localhost");
+ this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
+
+ testMode = props.getBoolean("test.mode", false);
+ }
+
+ public void sendFirstErrorMessage(ExecutableFlow flow) {
+ List<String> emailList = flow.getFailureEmails();
+ int execId = flow.getExecutionId();
+
+ if (emailList != null && !emailList.isEmpty()) {
+ EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+ message.setFromAddress(mailSender);
+ message.addAllToAddress(emailList);
+ message.setMimeType("text/html");
+ message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
+
+ message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + azkabanName + "</h2>");
+
+ if (flow.getFailureAction() == FailureAction.CANCEL_ALL) {
+ message.println("This flow is set to cancel all currently running jobs.");
+ }
+ else if (flow.getFailureAction() == FailureAction.FINISH_ALL_POSSIBLE){
+ message.println("This flow is set to complete all jobs that aren't blocked by the failure.");
+ }
+ else {
+ message.println("This flow is set to complete all currently running jobs before stopping.");
+ }
+
+ message.println("<table>");
+ message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
+ message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
+ message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
+ message.println("</table>");
+ message.println("");
+ String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+ message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+
+ message.println("");
+ message.println("<h3>Reason</h3>");
+ List<String> failedJobs = findFailedJobs(flow);
+ message.println("<ul>");
+ for (String jobId : failedJobs) {
+ message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
+ }
+
+ message.println("</ul>");
+
+ if (!testMode) {
+ try {
+ message.sendEmail();
+ } catch (MessagingException e) {
+ logger.error("Email message send failed" , e);
+ }
+ }
+ }
+ }
+
+ public void sendErrorEmail(ExecutableFlow flow, String ... extraReasons) {
+ List<String> emailList = flow.getFailureEmails();
+ int execId = flow.getExecutionId();
+
+ if (emailList != null && !emailList.isEmpty()) {
+ EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+ message.setFromAddress(mailSender);
+ message.addAllToAddress(emailList);
+ message.setMimeType("text/html");
+ message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
+
+ message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has failed on " + azkabanName + "</h2>");
+ message.println("<table>");
+ message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
+ message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
+ message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
+ message.println("</table>");
+ message.println("");
+ String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+ message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+
+ message.println("");
+ message.println("<h3>Reason</h3>");
+ List<String> failedJobs = findFailedJobs(flow);
+ message.println("<ul>");
+ for (String jobId : failedJobs) {
+ message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
+ }
+ for (String reasons: extraReasons) {
+ message.println("<li>" + reasons + "</li>");
+ }
+
+ message.println("</ul>");
+
+
+
+ if (!testMode) {
+ try {
+ message.sendEmail();
+ } catch (MessagingException e) {
+ logger.error("Email message send failed" , e);
+ }
+ }
+ }
+ }
+
+ public void sendSuccessEmail(ExecutableFlow flow) {
+ List<String> emailList = flow.getSuccessEmails();
+
+ int execId = flow.getExecutionId();
+
+ if (emailList != null && !emailList.isEmpty()) {
+ EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+ message.setFromAddress(mailSender);
+ message.addAllToAddress(emailList);
+ message.setMimeType("text/html");
+ message.setSubject("Flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName);
+
+ message.println("<h2> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName + "</h2>");
+ message.println("<table>");
+ message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
+ message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
+ message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
+ message.println("</table>");
+ message.println("");
+ String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+ message.println("<a href=\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+
+ if (!testMode) {
+ try {
+ message.sendEmail();
+ } catch (MessagingException e) {
+ logger.error("Email message send failed" , e);
+ }
+ }
+ }
+ }
+
+ private List<String> findFailedJobs(ExecutableFlow flow) {
+ ArrayList<String> failedJobs = new ArrayList<String>();
+ for (ExecutableNode node: flow.getExecutableNodes()) {
+ if (node.getStatus() == Status.FAILED) {
+ failedJobs.add(node.getJobId());
+ }
+ }
+
+ return failedJobs;
+ }
+
+ public void sendSlaEmail(SLA s, String ... extraReasons) {
+ List<String> emailList = s.getEmails();
+
+ if (emailList != null && !emailList.isEmpty()) {
+ EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+ message.setFromAddress(mailSender);
+ message.addAllToAddress(emailList);
+ message.setMimeType("text/html");
+ message.setSubject("SLA violation on " + azkabanName);
+
+// message.println("<h2 style=\"color:#FF0000\"> Execution '" + s.getExecId() + "' of flow '" + flow.getFlowId() + "' failed to meet SLA on " + azkabanName + "</h2>");
+// message.println("<table>");
+// message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
+// message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
+// message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
+// message.println("</table>");
+// message.println("");
+// String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+// message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+//
+// message.println("");
+// message.println("<h3>Reason</h3>");
+// List<String> failedJobs = findFailedJobs(flow);
+// message.println("<ul>");
+// for (String jobId : failedJobs) {
+// message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
+// }
+ for (String reasons: extraReasons) {
+ message.println("<li>" + reasons + "</li>");
+ }
+
+ message.println("</ul>");
+
+ if (!testMode) {
+ try {
+ message.sendEmail();
+ } catch (MessagingException e) {
+ logger.error("Email message send failed" , e);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
src/java/azkaban/sla/SLAManager.java 467(+467 -0)
diff --git a/src/java/azkaban/sla/SLAManager.java b/src/java/azkaban/sla/SLAManager.java
new file mode 100644
index 0000000..ae17595
--- /dev/null
+++ b/src/java/azkaban/sla/SLAManager.java
@@ -0,0 +1,467 @@
+package azkaban.sla;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+import org.apache.velocity.runtime.parser.node.GetExecutor;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlow.Status;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorMailer;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
+import azkaban.sla.SLA.SlaAction;
+import azkaban.sla.SLA.SlaRule;
+import azkaban.sla.SLA.SlaSetting;
+import azkaban.user.User;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+
+
+/**
+ * The SLAManager stores and checks the SLA (Service Level Agreement). It uses a single thread
+ * instead and waits until correct check time for the flow, and individual jobs in the flow if their SLA is set.
+ */
+public class SLAManager {
+ private static Logger logger = Logger.getLogger(SLAManager.class);
+
+ private SLALoader loader;
+
+ private final SLARunner runner;
+ private final SLAPreRunner prerunner;
+ private final ExecutorManager executorManager;
+ private SlaMailer mailer;
+
+ /**
+ * Give the sla manager a loader class that will properly load the
+ * sla.
+ *
+ * @param loader
+ * @throws SLAManagerException
+ */
+ public SLAManager(ExecutorManager executorManager,
+ SLALoader loader,
+ Props props) throws SLAManagerException
+ {
+ this.executorManager = executorManager;
+ this.loader = loader;
+ this.mailer = new SlaMailer(props);
+ this.runner = new SLARunner();
+ this.prerunner = new SLAPreRunner();
+
+ List<SLA> SLAList = null;
+ try {
+ SLAList = loader.loadSLAs();
+ } catch (SLAManagerException e) {
+ // TODO Auto-generated catch block
+ throw e;
+ }
+
+ for (SLA sla : SLAList) {
+ runner.addRunnerSLA(sla);
+ }
+
+ this.runner.start();
+ }
+
+ /**
+ * Shutdowns the sla thread. After shutdown, it may not be safe to use
+ * it again.
+ */
+ public void shutdown() {
+ this.runner.shutdown();
+ this.prerunner.shutdown();
+ }
+
+ /**
+ * Removes the flow from the SLA if it exists.
+ *
+ * @param id
+ * @throws SLAManagerException
+ */
+ public void removeSLA(SLA s) throws SLAManagerException {
+ logger.info("Removing SLA " + s.toString());
+ runner.removeRunnerSLA(s);
+ loader.removeSLA(s);
+ }
+
+ public void submitSla(
+ int execId,
+ String id,
+ DateTime checkTime,
+ List<String> emails,
+ List<SlaAction> slaActions,
+ List<SlaSetting> jobSettings,
+ SlaRule slaRule
+ ) throws SLAManagerException {
+ SLA s = new SLA(execId, id, checkTime, emails, slaActions, jobSettings, slaRule);
+ logger.info("Submitting SLA " + s.toString());
+ try {
+ loader.insertSLA(s);
+ runner.addRunnerSLA(s);
+ }
+ catch (SLAManagerException e) {
+ throw new SLAManagerException("Failed to add new SLA!" + e.getCause());
+ }
+ }
+
+ /**
+ * Thread that simply invokes the checking of flows when the SLA is
+ * ready.
+ *
+ */
+ public class SLARunner extends Thread {
+ private final PriorityBlockingQueue<SLA> SLAs;
+ private AtomicBoolean stillAlive = new AtomicBoolean(true);
+
+ // Five minute minimum intervals
+ private static final int TIMEOUT_MS = 60000;
+
+ public SLARunner() {
+ SLAs = new PriorityBlockingQueue<SLA>(1,new SLAComparator());
+ }
+
+ public void shutdown() {
+ logger.error("Shutting down SLA runner thread");
+ stillAlive.set(false);
+ this.interrupt();
+ }
+
+ /**
+ * Return a list of flow with SLAs
+ *
+ * @return
+ */
+ protected synchronized List<SLA> getRunnerSLAs() {
+ return new ArrayList<SLA>(SLAs);
+ }
+
+ /**
+ * Adds SLA into runner and then interrupts so it will update
+ * its wait time.
+ *
+ * @param flow
+ */
+ public synchronized void addRunnerSLA(SLA s) {
+ logger.info("Adding " + s + " to SLA runner.");
+ SLAs.add(s);
+ this.interrupt();
+ }
+
+ /**
+ * Remove runner SLA. Does not interrupt.
+ *
+ * @param flow
+ * @throws SLAManagerException
+ */
+ public synchronized void removeRunnerSLA(SLA s) {
+ logger.info("Removing " + s + " from the SLA runner.");
+ SLAs.remove(s);
+ }
+
+ public void run() {
+ while (stillAlive.get()) {
+ synchronized (this) {
+ try {
+ // TODO clear up the exception handling
+ SLA s = SLAs.peek();
+
+ if (s == null) {
+ // If null, wake up every minute or so to see if
+ // there's something to do. Most likely there will not be.
+ try {
+ this.wait(TIMEOUT_MS);
+ } catch (InterruptedException e) {
+ // interruption should occur when items are added or removed from the queue.
+ }
+ } else {
+ // We've passed the flow execution time, so we will run.
+ if (!(new DateTime(s.getCheckTime())).isAfterNow()) {
+ // Run flow. The invocation of flows should be quick.
+ SLA runningSLA = SLAs.poll();
+
+ logger.info("Checking sla " + runningSLA.toString() );
+
+ int execId = s.getExecId();
+ ExecutableFlow exflow = executorManager.getExecutableFlow(execId);
+
+ if(runningSLA.getJobName().equals("") && runningSLA.getRule().equals(SlaRule.WAITANDCHECKJOB)) {
+ // do the checking of potential jobsla submissions
+ List<SlaSetting> jobSettings = runningSLA.getJobSettings();
+ List<SlaSetting> removeSettings = new ArrayList<SLA.SlaSetting>();
+ for(SlaSetting set : jobSettings) {
+ ExecutableNode node = exflow.getExecutableNode(set.getId());
+ if(node.getStartTime() != -1 || executorManager.isFinished(exflow)) {
+ submitSla(execId, set.getId(), new DateTime(node.getStartTime()).plus(set.getDuration()), runningSLA.getEmails(), set.getActions(), null, set.getRule());
+ removeSettings.add(set);
+ logger.info("Job " + set.getId() + " already started, monitoring SLA.");
+ }
+ }
+ for(SlaSetting remove : removeSettings) {
+ jobSettings.remove(remove);
+ }
+ if(jobSettings.size() == 0) {
+ removeRunnerSLA(runningSLA);
+ loader.removeSLA(runningSLA);
+ }
+ else {
+ removeRunnerSLA(runningSLA);
+ loader.removeSLA(runningSLA);
+ runningSLA.setCheckTime(runningSLA.getCheckTime().plusMillis(TIMEOUT_MS));
+ addRunnerSLA(runningSLA);
+ loader.insertSLA(runningSLA);
+ }
+ }
+ else {
+ if(!metSla(runningSLA, exflow)) {
+ takeSLAActions(runningSLA, exflow);
+ }
+
+
+ removeRunnerSLA(runningSLA);
+ loader.removeSLA(runningSLA);
+ }
+ } else {
+ // wait until flow run
+ long millisWait = Math.max(0, s.getCheckTime().getMillis() - (new DateTime()).getMillis());
+ try {
+ this.wait(Math.min(millisWait, TIMEOUT_MS));
+ } catch (InterruptedException e) {
+ // interruption should occur when items are
+ // added or removed from the queue.
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Unexpected exception has been thrown in scheduler", e);
+ } catch (Throwable e) {
+ logger.error("Unexpected throwable has been thrown in scheduler", e);
+ }
+ }
+ }
+ }
+
+ private boolean metSla(SLA s, ExecutableFlow exflow) {
+ SlaRule rule = s.getRule();
+ long finishTime;
+ Status status;
+ if(s.getJobName().equals("")) {
+ finishTime = exflow.getEndTime();
+ status = exflow.getStatus();
+ }
+ else {
+ ExecutableNode exnode = exflow.getExecutableNode(s.getJobName());
+ finishTime = exnode.getEndTime();
+ status = exnode.getStatus();
+ }
+
+ switch(rule) {
+ case FINISH: // check finish time
+ return finishTime != -1 && finishTime < s.getCheckTime().getMillis();
+ case SUCCESS: // check finish and successful
+ return status == Status.SUCCEEDED && finishTime < s.getCheckTime().getMillis();
+ default:
+ logger.error("Unknown SLA rules!");
+ return false;
+ }
+ }
+
+ /**
+ * Class to sort the sla based on time.
+ *
+ */
+ private class SLAComparator implements Comparator<SLA> {
+ @Override
+ public int compare(SLA arg0, SLA arg1) {
+ long first = arg1.getCheckTime().getMillis();
+ long second = arg0.getCheckTime().getMillis();
+
+ if (first == second) {
+ return 0;
+ } else if (first < second) {
+ return 1;
+ }
+
+ return -1;
+ }
+ }
+ }
+
+ private void takeSLAActions(SLA s, ExecutableFlow exflow) {
+ logger.info("SLA " + s.toString() + " missed! Taking predefined actions");
+ List<SlaAction> actions = s.getActions();
+ for(SlaAction act : actions) {
+ if(act.equals(SlaAction.EMAIL)) {
+ try {
+ sendSlaAlertEmail(s, exflow);
+ }
+ catch (Exception e) {
+ logger.error("Failed to send out SLA alert email. " + e.getCause());
+ }
+ } else if(act.equals(SlaAction.KILL)) {
+ try {
+ executorManager.cancelFlow(exflow, "azkaban");
+ //sendSlaKillEmail(s, exflow);
+ } catch (ExecutorManagerException e) {
+ // TODO Auto-generated catch block
+ logger.error("Cancel flow failed." + e.getCause());
+ }
+ }
+ }
+ }
+
+ private void sendSlaAlertEmail(SLA s, ExecutableFlow exflow) {
+ String message = null;
+ ExecutableNode exnode;
+ switch(s.getRule()) {
+ case FINISH:
+ if(s.getJobName().equals("")) {
+ message = "Flow " + exflow.getFlowId() + " failed to finish with set SLA." + String.format("%n");
+ message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format("%n");
+ message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
+ }
+ else {
+ exnode = exflow.getExecutableNode(s.getJobName());
+ message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " failed to finish with set SLA." + String.format("%n");
+ message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
+ message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
+ }
+ break;
+ case SUCCESS:
+ if(s.getJobName().equals("")) {
+ message = "Flow " + exflow.getFlowId() + " didn't finish successfully with set SLA. " + String.format("%n");
+ message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format(" %n");
+ message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
+ }
+ else {
+ exnode = exflow.getExecutableNode(s.getJobName());
+ message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " didn't finish successfully with set SLA." + String.format("%n");
+ message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
+ message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
+ }
+ break;
+ default:
+ logger.error("Unknown SLA rules!");
+ message = "Unknown SLA was not met!";
+ break;
+ }
+ mailer.sendSlaEmail(s, message);
+ }
+
+ public class SLAPreRunner extends Thread {
+ private final List<SLA> preSlas;
+ private AtomicBoolean stillAlive = new AtomicBoolean(true);
+
+ // Five minute minimum intervals
+ private static final int TIMEOUT_MS = 300000;
+
+ public SLAPreRunner() {
+ preSlas = new ArrayList<SLA>();
+ }
+
+ public void shutdown() {
+ logger.error("Shutting down pre-sla checker thread");
+ stillAlive.set(false);
+ this.interrupt();
+ }
+
+ /**
+ * Return a list of flow with SLAs
+ *
+ * @return
+ */
+ protected synchronized List<SLA> getPreSlas() {
+ return new ArrayList<SLA>(preSlas);
+ }
+
+ /**
+ * Adds SLA into runner and then interrupts so it will update
+ * its wait time.
+ *
+ * @param flow
+ */
+ public synchronized void addCheckerPreSla(SLA s) {
+ logger.info("Adding " + s + " to pre-sla checker.");
+ preSlas.add(s);
+ this.interrupt();
+ }
+
+ /**
+ * Remove runner SLA. Does not interrupt.
+ *
+ * @param flow
+ * @throws SLAManagerException
+ */
+ public synchronized void removeCheckerPreSla(SLA s) {
+ logger.info("Removing " + s + " from the pre-sla checker.");
+ preSlas.remove(s);
+ }
+
+ public void run() {
+ while (stillAlive.get()) {
+ synchronized (this) {
+ try {
+ // TODO clear up the exception handling
+
+ if (preSlas.size() == 0) {
+ try {
+ this.wait(TIMEOUT_MS);
+ } catch (InterruptedException e) {
+ // interruption should occur when items are added or removed from the queue.
+ }
+ } else {
+ for(SLA s : preSlas) {
+ ExecutableFlow exflow = executorManager.getExecutableFlow(s.getExecId());
+ String id = s.getJobName();
+ if(!s.equals("")) {
+ ExecutableNode exnode = exflow.getExecutableNode(id);
+ if(exnode.getStartTime() != -1) {
+
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Unexpected exception has been thrown in scheduler", e);
+ } catch (Throwable e) {
+ logger.error("Unexpected throwable has been thrown in scheduler", e);
+ }
+ }
+ }
+ }
+ }
+
+
+}
diff --git a/src/java/azkaban/sla/SLAManagerException.java b/src/java/azkaban/sla/SLAManagerException.java
new file mode 100644
index 0000000..a3468a2
--- /dev/null
+++ b/src/java/azkaban/sla/SLAManagerException.java
@@ -0,0 +1,13 @@
+package azkaban.sla;
+
+public class SLAManagerException extends Exception{
+ private static final long serialVersionUID = 1L;
+
+ public SLAManagerException(String message) {
+ super(message);
+ }
+
+ public SLAManagerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 22d1af1..46e2cad 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -53,6 +53,7 @@ import azkaban.scheduler.JdbcScheduleLoader;
import azkaban.scheduler.ScheduleManager;
import azkaban.sla.JdbcSLALoader;
import azkaban.sla.SLAManager;
+import azkaban.sla.SLAManagerException;
import azkaban.user.UserManager;
import azkaban.user.XmlUserManager;
import azkaban.utils.FileIOUtils;
@@ -147,8 +148,8 @@ public class AzkabanWebServer implements AzkabanServer {
userManager = loadUserManager(props);
projectManager = loadProjectManager(props);
executorManager = loadExecutorManager(props);
- scheduleManager = loadScheduleManager(executorManager, props);
- slaManager = loadSLAManager();
+ slaManager = loadSLAManager(props);
+ scheduleManager = loadScheduleManager(executorManager, slaManager, props);
baseClassLoader = getBaseClassloader();
tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
@@ -208,14 +209,14 @@ public class AzkabanWebServer implements AzkabanServer {
return execManager;
}
- private ScheduleManager loadScheduleManager(ExecutorManager execManager, Props props ) throws Exception {
- ScheduleManager schedManager = new ScheduleManager(execManager, projectManager, new JdbcScheduleLoader(props));
+ private ScheduleManager loadScheduleManager(ExecutorManager execManager, SLAManager slaManager, Props props ) throws Exception {
+ ScheduleManager schedManager = new ScheduleManager(execManager, projectManager, slaManager, new JdbcScheduleLoader(props));
return schedManager;
}
- private SLAManager loadSLAManager() {
- SLAManager slaManager = new SLAManager(executorManager, projectManager, new JdbcSLALoader(props));
+ private SLAManager loadSLAManager(Props props) throws SLAManagerException {
+ SLAManager slaManager = new SLAManager(executorManager, new JdbcSLALoader(props), props);
return slaManager;
}
src/java/azkaban/webapp/servlet/ScheduleServlet.java 437(+289 -148)
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 954de95..b5da554 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -18,6 +18,7 @@ package azkaban.webapp.servlet;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -38,7 +39,10 @@ import org.joda.time.ReadablePeriod;
import org.joda.time.format.DateTimeFormat;
import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.executor.ExecutableFlow.Status;
import azkaban.flow.Flow;
import azkaban.flow.Node;
import azkaban.project.Project;
@@ -53,12 +57,14 @@ import azkaban.utils.Pair;
import azkaban.webapp.AzkabanWebServer;
import azkaban.webapp.session.Session;
import azkaban.scheduler.Schedule;
+import azkaban.scheduler.Schedule.FlowOptions;
+import azkaban.scheduler.Schedule.SlaOptions;
import azkaban.scheduler.ScheduleManager;
-import azkaban.sla.FlowRule;
-import azkaban.sla.JobRule;
import azkaban.sla.SLA;
+import azkaban.sla.SLA.SlaRule;
import azkaban.sla.SLAManager;
import azkaban.sla.SLA.SlaAction;
+import azkaban.sla.SLA.SlaSetting;
public class ScheduleServlet extends LoginAbstractAzkabanServlet {
private static final long serialVersionUID = 1L;
@@ -93,11 +99,14 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
HashMap<String, Object> ret = new HashMap<String, Object>();
String ajaxName = getParam(req, "ajax");
- if (ajaxName.equals("schedInfo")) {
- ajaxSchedInfo(req, resp, ret, session.getUser());
+ if (ajaxName.equals("slaInfo")) {
+ ajaxSlaInfo(req, ret, session.getUser());
}
else if(ajaxName.equals("setSla")) {
- ajaxSetSla(req, resp, ret, session.getUser());
+ ajaxSetSla(req, ret, session.getUser());
+ }
+ else if(ajaxName.equals("advSchedule")) {
+ ajaxAdvSchedule(req, ret, session.getUser());
}
if (ret != null) {
@@ -105,7 +114,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
}
- private void ajaxSetSla(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user) {
+ private void ajaxSetSla(HttpServletRequest req, HashMap<String, Object> ret, User user) {
try {
int projectId = getIntParam(req, "projectId");
@@ -117,26 +126,40 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return;
}
- String slaEmals = getParam(req, "slaEmails");
- System.out.println(slaEmals);
-
- String flowRules = getParam(req, "flowRules");
- FlowRule flowRule = parseFlowRule(flowRules);
+ Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projectId, flowName));
+
+ SlaOptions slaOptions= new SlaOptions();
+
+ String slaEmails = getParam(req, "slaEmails");
+ System.out.println(slaEmails);
+ String[] emailSplit = slaEmails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
- List<JobRule> jobRule = new ArrayList<JobRule>();
- Map<String, String> jobRules = getParamGroup(req, "jobRules");
- System.out.println(jobRules);
- for(String job : jobRules.keySet()) {
- JobRule jr = parseJobRule(job, jobRules.get(job));
- jobRule.add(jr);
+ Map<String, String> settings = getParamGroup(req, "settings");
+ System.out.println(settings);
+ List<SlaSetting> slaSettings = new ArrayList<SlaSetting>();
+ for(String set : settings.keySet()) {
+ SlaSetting s;
+ try {
+ s = parseSlaSetting(settings.get(set));
+ }
+ catch (Exception e) {
+ throw new ServletException(e);
+ }
+ if(s != null) {
+ slaSettings.add(s);
+ }
}
- Map<String, Object> options= new HashMap<String, Object>();
- options.put("slaEmails", slaEmals);
- options.put("flowRules", flowRules);
- options.put("jobRules", jobRule);
- Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projectId, flowName));
- //slaManager.addFlowSLA(projectId, project.getName(), flowName, "ready", sched.getFirstSchedTime(), sched.getTimezone(), sched.getPeriod(), DateTime.now(), DateTime.now(), DateTime.now(), user, options);
-
+
+ if(slaSettings.size() > 0) {
+ slaOptions.setSlaEmails(Arrays.asList(emailSplit));
+ slaOptions.setSettings(slaSettings);
+ }
+ else {
+ slaOptions = null;
+ }
+ sched.setSlaOptions(slaOptions);
+ scheduleManager.insertSchedule(sched);
+
} catch (ServletException e) {
ret.put("error", e);
}
@@ -144,67 +167,46 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
- private FlowRule parseFlowRule(String flowRules) {
- String[] parts = flowRules.split(",");
- String duration = parts[0];
- String emailAction = parts[1];
- String killAction = parts[2];
+ private SlaSetting parseSlaSetting(String set) {
+ // "" + Duration + EmailAction + KillAction
+ String[] parts = set.split(",", -1);
+ String id = parts[0];
+ String rule = parts[1];
+ String duration = parts[2];
+ String emailAction = parts[3];
+ String killAction = parts[4];
if(emailAction.equals("on") || killAction.equals("on")) {
- if(!duration.equals("")) {
- FlowRule r = new FlowRule();
- ReadablePeriod dur = parseDuration(duration);
- r.setDuration(dur);
- List<SlaAction> actions = new ArrayList<SLA.SlaAction>();
- if(emailAction.equals("on")) {
- actions.add(SlaAction.SENDEMAIL);
- }
- if(killAction.equals("on")) {
- actions.add(SlaAction.KILL);
- }
- r.setActions(actions);
- return r;
+ SlaSetting r = new SlaSetting();
+ r.setId(id);
+ r.setRule(SlaRule.valueOf(rule));
+ ReadablePeriod dur = parseDuration(duration);
+ r.setDuration(dur);
+ List<SlaAction> actions = new ArrayList<SLA.SlaAction>();
+ if(emailAction.equals("on")) {
+ actions.add(SlaAction.EMAIL);
}
- }
- return null;
- }
-
- private JobRule parseJobRule(String job, String jobRule) {
- String[] parts = jobRule.split(",");
- String duration = parts[0];
- String emailAction = parts[1];
- String killAction = parts[2];
- if(emailAction.equals("on") || killAction.equals("on")) {
- if(!duration.equals("")) {
- JobRule r = new JobRule();
- r.setJobId(job);
- ReadablePeriod dur = parseDuration(duration);
- r.setDuration(dur);
- List<SlaAction> actions = new ArrayList<SLA.SlaAction>();
- if(emailAction.equals("on")) {
- actions.add(SlaAction.SENDEMAIL);
- }
- if(killAction.equals("on")) {
- actions.add(SlaAction.KILL);
- }
- r.setActions(actions);
- return r;
+ if(killAction.equals("on")) {
+ actions.add(SlaAction.KILL);
}
- }
+ r.setActions(actions);
+ return r;
+ }
return null;
}
private ReadablePeriod parseDuration(String duration) {
- int hour = Integer.parseInt(duration.split(",")[0]);
- int min = Integer.parseInt(duration.split(",")[1]);
- return Hours.hours(hour).toPeriod().plus(Minutes.minutes(min).toPeriod());
+ int hour = Integer.parseInt(duration.split(":")[0]);
+ int min = Integer.parseInt(duration.split(":")[1]);
+ return Minutes.minutes(min+hour*60).toPeriod();
}
@SuppressWarnings("unchecked")
- private void ajaxSchedInfo(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user) {
+ private void ajaxSlaInfo(HttpServletRequest req, HashMap<String, Object> ret, User user) {
int projId;
+ String flowName;
try {
projId = getIntParam(req, "projId");
- String flowName = getParam(req, "flowName");
+ flowName = getParam(req, "flowName");
Project project = getProjectAjaxByPermission(ret, projId, user, Type.READ);
if (project == null) {
@@ -218,36 +220,52 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return;
}
- SLA sla = slaManager.getSLA(new Pair<Integer, String>(projId, flowName));
+ Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projId, flowName));
- if(sla != null) {
- ret.put("slaEmails", (List<String>)sla.getSlaOptions().get("slaEmails"));
- List<String> allJobs = new ArrayList<String>();
- for(Node n : flow.getNodes()) {
- allJobs.add(n.getId());
- }
- ret.put("allJobs", allJobs);
- if(sla.getFlowRules() != null) {
- ret.put("flowRules", sla.getFlowRules());
+ SlaOptions slaOptions = sched.getSlaOptions();
+ FlowOptions flowOptions = sched.getFlowOptions();
+
+ if(slaOptions != null) {
+ ret.put("slaEmails", slaOptions.getSlaEmails());
+ List<SlaSetting> settings = slaOptions.getSettings();
+ List<Object> setObj = new ArrayList<Object>();
+ for(SlaSetting set: settings) {
+ setObj.add(set.toObject());
}
- if(sla.getJobRules() != null) {
- ret.put("jobRules", sla.getJobRules());
+ ret.put("settings", setObj);
+ }
+ else if (flowOptions != null) {
+ if(flowOptions.getFailureEmails() != null) {
+ List<String> emails = flowOptions.getFailureEmails();
+ if(emails.size() > 0) {
+ ret.put("slaEmails", emails);
+ }
}
}
else {
- ret.put("slaEmails", flow.getFailureEmails());
- List<String> allJobs = new ArrayList<String>();
- Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projId, flowName));
- List<String> disabled = sched.getDisabledJobs();
- for(Node n : flow.getNodes()) {
- if(!disabled.contains(n.getId())) {
- allJobs.add(n.getId());
+ if(flow.getFailureEmails() != null) {
+ List<String> emails = flow.getFailureEmails();
+ if(emails.size() > 0) {
+ ret.put("slaEmails", emails);
}
}
- ret.put("allJobs", allJobs);
-
+ }
+
+ List<String> disabledJobs;
+ if(flowOptions != null) {
+ disabledJobs = flowOptions.getDisabledJobs() == null ? new ArrayList<String>() : flowOptions.getDisabledJobs();
+ }
+ else {
+ disabledJobs = new ArrayList<String>();
+ }
+ List<String> allJobs = new ArrayList<String>();
+ for(Node n : flow.getNodes()) {
+ if(!disabledJobs.contains(n.getId())) {
+ allJobs.add(n.getId());
+ }
}
+ ret.put("allJobNames", allJobs);
} catch (ServletException e) {
ret.put("error", e);
}
@@ -277,45 +295,38 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
List<Schedule> schedules = scheduleManager.getSchedules();
page.add("schedules", schedules);
-
- List<SLA> slas = slaManager.getSLAs();
- page.add("slas", slas);
+//
+// List<SLA> slas = slaManager.getSLAs();
+// page.add("slas", slas);
page.render();
}
@Override
protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
- HashMap<String, Object> ret = new HashMap<String, Object>();
- if (hasParam(req, "action")) {
- String action = getParam(req, "action");
- if (action.equals("scheduleFlow")) {
- ajaxScheduleFlow(req, ret, session.getUser());
- }
- else if(action.equals("removeSched")){
- ajaxRemoveSched(req, ret, session.getUser());
+ if (hasParam(req, "ajax")) {
+ handleAJAXAction(req, resp, session);
+ }
+ else {
+ HashMap<String, Object> ret = new HashMap<String, Object>();
+ if (hasParam(req, "action")) {
+ String action = getParam(req, "action");
+ if (action.equals("scheduleFlow")) {
+ ajaxScheduleFlow(req, ret, session.getUser());
+ }
+ else if(action.equals("removeSched")){
+ ajaxRemoveSched(req, ret, session.getUser());
+ }
}
+
+ if(ret.get("status") == ("success"))
+ setSuccessMessageInCookie(resp, (String) ret.get("message"));
+ else
+ setErrorMessageInCookie(resp, (String) ret.get("message"));
+
+ this.writeJSON(resp, ret);
}
-
- if(ret.get("status") == ("success"))
- setSuccessMessageInCookie(resp, (String) ret.get("message"));
- else
- setErrorMessageInCookie(resp, (String) ret.get("message"));
-
- this.writeJSON(resp, ret);
}
-
-// private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
-// HashMap<String, Object> ret = new HashMap<String, Object>();
-// String ajaxName = getParam(req, "ajax");
-//
-// if (ajaxName.equals("scheduleFlow")) {
-// ajaxScheduleFlow(req, ret, session.getUser());
-// }
-//// }
-// this.writeJSON(resp, ret);
-// }
-//
private void ajaxRemoveSched(HttpServletRequest req, Map<String, Object> ret, User user) throws ServletException{
int projectId = getIntParam(req, "projectId");
@@ -406,40 +417,170 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
hour += 12;
hour %= 24;
-// String submitUser = user.getUserId();
-// String userExec = userSubmit;//getParam(req, "userExec");
-// String scheduleId = projectId + "." + flowName;
DateTime submitTime = new DateTime();
DateTime firstSchedTime = day.withHourOfDay(hour).withMinuteOfHour(minutes).withSecondOfMinute(0);
- //ScheduledFlow schedFlow = scheduleManager.schedule(scheduleId, projectId, flowId, userExec, userSubmit, submitTime, firstSchedTime, thePeriod);
- //project.info("User '" + user.getUserId() + "' has scheduled " + flow.getId() + "[" + schedFlow.toNiceString() + "].");
- Schedule schedule = scheduleManager.scheduleFlow(projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), timezone, thePeriod, submitTime.getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), null);
+ Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projectId, flowName));
+ FlowOptions flowOptions = null;
+ SlaOptions slaOptions = null;
+ if(sched != null) {
+ if(sched.getFlowOptions() != null) {
+ flowOptions = sched.getFlowOptions();
+ }
+ if(sched.getSlaOptions() != null) {
+ slaOptions = sched.getSlaOptions();
+ }
+ }
+ Schedule schedule = scheduleManager.scheduleFlow(projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), timezone, thePeriod, submitTime.getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);
logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName + " (" + projectId +")" + "].");
projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.getScheduleName() + " has been added.");
ret.put("status", "success");
ret.put("message", projectName + "." + flowName + " scheduled.");
}
-
-// private ReadablePeriod parsePeriod(HttpServletRequest req) throws ServletException {
-// int period = getIntParam(req, "period");
-// String periodUnits = getParam(req, "period_units");
-// if("M".equals(periodUnits))
-// return Months.months(period);
-// else if("w".equals(periodUnits))
-// return Weeks.weeks(period);
-// else if("d".equals(periodUnits))
-// return Days.days(period);
-// else if("h".equals(periodUnits))
-// return Hours.hours(period);
-// else if("m".equals(periodUnits))
-// return Minutes.minutes(period);
-// else if("s".equals(periodUnits))
-// return Seconds.seconds(period);
-// else
-// throw new ServletException("Unknown period unit: " + periodUnits);
-// }
+
+ private void ajaxAdvSchedule(HttpServletRequest req, HashMap<String, Object> ret, User user) throws ServletException {
+ String projectName = getParam(req, "projectName");
+ String flowName = getParam(req, "flowName");
+ int projectId = getIntParam(req, "projectId");
+
+ Project project = projectManager.getProject(projectId);
+
+ if (project == null) {
+ ret.put("message", "Project " + projectName + " does not exist");
+ ret.put("status", "error");
+ return;
+ }
+
+ if (!hasPermission(project, user, Type.SCHEDULE)) {
+ ret.put("status", "error");
+ ret.put("message", "Permission denied. Cannot execute " + flowName);
+ return;
+ }
+
+ Flow flow = project.getFlow(flowName);
+ if (flow == null) {
+ ret.put("status", "error");
+ ret.put("message", "Flow " + flowName + " cannot be found in project " + project);
+ return;
+ }
+
+ String scheduleTime = getParam(req, "scheduleTime");
+ String scheduleDate = getParam(req, "scheduleDate");
+ DateTime firstSchedTime;
+ try {
+ firstSchedTime = parseDateTime(scheduleDate, scheduleTime);
+ }
+ catch (Exception e) {
+ ret.put("error", "Invalid date and/or time '" + scheduleDate + " " + scheduleTime);
+ return;
+ }
+
+ ReadablePeriod thePeriod = null;
+ try {
+ if(hasParam(req, "is_recurring") && getParam(req, "is_recurring").equals("on")) {
+ thePeriod = Schedule.parsePeriodString(getParam(req, "period"));
+ }
+ }
+ catch(Exception e){
+ ret.put("error", e.getMessage());
+ }
+
+ Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projectId, flowName));
+ FlowOptions flowOptions = null;
+ try {
+ flowOptions = parseFlowOptions(req);
+ }
+ catch (Exception e) {
+ ret.put("error", e.getMessage());
+ }
+ SlaOptions slaOptions = null;
+ if(sched != null) {
+ if(sched.getSlaOptions() != null) {
+ slaOptions = sched.getSlaOptions();
+ }
+ }
+ Schedule schedule = scheduleManager.scheduleFlow(projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(), thePeriod, DateTime.now().getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);
+ logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName + " (" + projectId +")" + "].");
+ projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.getScheduleName() + " has been added.");
+
+ ret.put("status", "success");
+ ret.put("message", projectName + "." + flowName + " scheduled.");
+ }
+
+ private FlowOptions parseFlowOptions(HttpServletRequest req) throws ServletException {
+ FlowOptions flowOptions = new FlowOptions();
+ if (hasParam(req, "failureAction")) {
+ String option = getParam(req, "failureAction");
+ if (option.equals("finishCurrent") ) {
+ flowOptions.setFailureAction(FailureAction.FINISH_CURRENTLY_RUNNING);
+ }
+ else if (option.equals("cancelImmediately")) {
+ flowOptions.setFailureAction(FailureAction.CANCEL_ALL);
+ }
+ else if (option.equals("finishPossible")) {
+ flowOptions.setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
+ }
+ }
+
+ if (hasParam(req, "failureEmails")) {
+ String emails = getParam(req, "failureEmails");
+ String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+ flowOptions.setFailureEmails(Arrays.asList(emailSplit));
+ }
+ if (hasParam(req, "successEmails")) {
+ String emails = getParam(req, "successEmails");
+ String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+ flowOptions.setSuccessEmails(Arrays.asList(emailSplit));
+ }
+ if (hasParam(req, "notifyFailureFirst")) {
+ flowOptions.setNotifyOnFirstFailure(Boolean.parseBoolean(getParam(req, "notifyFailureFirst")));
+ }
+ if (hasParam(req, "notifyFailureLast")) {
+ flowOptions.setNotifyOnLastFailure(Boolean.parseBoolean(getParam(req, "notifyFailureLast")));
+ }
+ if (hasParam(req, "executingJobOption")) {
+ //String option = getParam(req, "jobOption");
+ // Not set yet
+ }
+
+ Map<String, String> flowParamGroup = this.getParamGroup(req, "flowOverride");
+ flowOptions.setFlowOverride(flowParamGroup);
+
+ if (hasParam(req, "disabledJobs")) {
+ String disable = getParam(req, "disabledJobs");
+ String[] disableSplit = disable.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+ List<String> jobs = (List<String>) Arrays.asList(disableSplit);
+ flowOptions.setDisabledJobs(jobs.subList(1, jobs.size()));
+ }
+ return flowOptions;
+ }
+
+ private DateTime parseDateTime(String scheduleDate, String scheduleTime) {
+ // scheduleTime: 12,00,pm,PDT
+ String[] parts = scheduleTime.split(",", -1);
+ int hour = Integer.parseInt(parts[0]);
+ int minutes = Integer.parseInt(parts[1]);
+ boolean isPm = parts[2].equalsIgnoreCase("pm");
+
+ DateTimeZone timezone = parts[3].equals("UTC") ? DateTimeZone.UTC : DateTimeZone.forID("America/Los_Angeles");
+
+ // scheduleDate: 02/10/2013
+ DateTime day = null;
+ if(scheduleDate == null || scheduleDate.trim().length() == 0) {
+ day = new LocalDateTime().toDateTime();
+ } else {
+ day = DateTimeFormat.forPattern("MM/dd/yyyy").withZone(timezone).parseDateTime(scheduleDate);
+ }
+
+ if(isPm && hour < 12)
+ hour += 12;
+ hour %= 24;
+
+ DateTime firstSchedTime = day.withHourOfDay(hour).withMinuteOfHour(minutes).withSecondOfMinute(0);
+
+ return firstSchedTime;
+ }
private boolean hasPermission(Project project, User user, Permission.Type type) {
if (project.hasPermission(user, type)) {
src/java/azkaban/webapp/servlet/velocity/flowpage.vm 183(+23 -160)
diff --git a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
index 0f8ddce..102c43f 100644
--- a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
@@ -34,6 +34,7 @@
<script type="text/javascript" src="${context}/js/azkaban.flow.graph.view.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.flow.view.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.exflow.options.view.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.schedule.options.view.js"></script>
<script type="text/javascript" src="${context}/js/svgNavigate.js"></script>
<script type="text/javascript">
var contextURL = "${context}";
@@ -131,165 +132,7 @@
</div>
</div>
<!-- modal content -->
- <div id="advScheduleModalBackground" class="modalBackground2">
- <div id="schedule-options" class="modal modalContainer2">
- <a href='#' title='Close' class='modal-close'>x</a>
- <h3>Advanced Schedule Options</h3>
- <div>
- <ul class="optionsPicker">
- <li id="scheduleGeneralOptions">General Options</li>
- <li id="scheduleFlowOptions">Flow Options</li>
- <li id="scheduleSlaOptions">SLA Options</li>
- </ul>
- </div>
- <div class="optionsPane">
- <div id="scheduleSlaPanel" class="generalPanel panel">
- <div id="slaActions">
- <h4>SLA Alert Emails</h4>
- <dl>
- <dt >SLA Alert Emails</dt>
- <dd>
- <textarea id="slaEmails"></textarea>
- </dd>
- </dl>
- </div>
- <div id="slaRules">
- <h4>Flow SLA Rules</h4>
- <div class="tableDiv">
- <table id="flowRulesTbl">
- <thead>
- <tr>
- <th>Flow/Job</th>
- <th>Finish In</th>
- <th>Email Action</th>
- <th>Kill Action</th>
- </tr>
- </thead>
- <tbody>
- </tbody>
- </table>
- </div>
- <h4>Job SLA Rules</h4>
- <div class="tableDiv">
- <table id="jobRulesTbl">
- <thead>
- <tr>
- <th>Flow/Job</th>
- <th>Finish In</th>
- <th>Email Action</th>
- <th>Kill Action</th>
- </tr>
- </thead>
- <tbody>
- </tbody>
- </table>
- </div>
- </div>
- </div>
- <div id="scheduleGeneralPanel" class="generalPanel panel">
- <div id="scheduleBasicInfo">
- <h4>Basic Scheduling Information</h4>
- <dl>
- <dt>Schedule Time</dt>
- <dd>
- <input id="advHour" type="text" size="2" value="12"/>
- <input id="advMinutes" type="text" size="2" value="00"/>
- <select id="advAm_pm">
- <option>pm</option>
- <option>am</option>
- </select>
- <select id="advTimezone">
- <option>PDT</option>
- <option>UTC</option>
- </select>
- </dd>
- <dt>Schedule Date</dt>
- <dd><input type="text" id="advDate" /></dd>
- <dt>Recurrence</dt>
- <dd>
- <input id="advIs_recurring" type="checkbox" checked /><span>repeat every</span>
- <input id="advPeriod" type="text" size="2" value="1"/>
- <select id="advPeriod_units">
- <option value="d">Days</option>
- <option value="h">Hours</option>
- <option value="m">Minutes</option>
- <option value="M">Months</option>
- <option value="w">Weeks</option>
- </select>
- </dd>
- </dl>
- </div>
- <div id="scheduleCompleteActions">
- <h4>Completion Actions</h4>
- <dl>
- <dt class="disabled">Failure Action</dt>
- <dd>
- <select id="scheduleFailureAction" name="failureAction">
- <option value="finishCurrent">Finish Current Running</option>
- <option value="cancelImmediately">Cancel All</option>
- <option value="finishPossible">Finish All Possible</option>
- </select>
- </dd>
- <dt>Failure Email</dt>
- <dd>
- <textarea id="scheduleFailureEmails"></textarea>
- </dd>
- <dt>Notify on Failure</dt>
- <dd>
- <input id="scheduleNotifyFailureFirst" class="checkbox" type="checkbox" name="notify" value="first" checked >First Failure</input>
- <input id="scheduleNotifyFailureLast" class="checkbox" type="checkbox" name="notify" value="last">Flow Stop</input>
- </dd>
- <dt>Success Email</dt>
- <dd>
- <textarea id="scheduleSuccessEmails"></textarea>
- </dd>
- <dt class="disabled" >Concurrent Execution</dt>
- <dd id="scheduleExecutingJob" class="disabled">
- <input id="scheduleIgnore" class="radio" type="radio" name="concurrent" value="ignore" checked /><label class="radioLabel" for="ignore">Run Concurrently</label>
- <input id="schedulePipeline" class="radio" type="radio" name="concurrent" value="pipeline" /><label class="radioLabel" for="pipeline">Pipeline</label>
- <input id="scheduleQueue" class="radio" type="radio" name="concurrent" value="queue" /><label class="radioLabel" for="queue">Queue Job</label>
- </dd>
- </dl>
- </div>
- <div id="scheduleFlowPropertyOverride">
- <h4>Flow Property Override</h4>
- <div class="tableDiv">
- <table>
- <thead>
- <tr>
- <th>Name</th>
- <th>Value</th>
- </tr>
- </thead>
- <tbody>
- <tr id="scheduleAddRow"><td id="scheduleAddRow-col" colspan="2"><span class="addIcon"></span><a href="#">Add Row</a></td></tr>
- </tbody>
- </table>
- </div>
- </div>
- </div>
- <div id="scheduleGraphPanel" class="graphPanel panel">
- <div id="scheduleJobListCustom" class="jobList">
- <div class="filterList">
- <input class="filter" placeholder=" Job Filter" />
- </div>
- <div class="list">
- </div>
- <div class="btn5 resetPanZoomBtn" >Reset Pan Zoom</div>
- </div>
- <div id="scheduleSvgDivCustom" class="svgDiv" >
- <svg class="svgGraph" xmlns="http://www.w3.org/2000/svg" version="1.1" shape-rendering="optimize-speed" text-rendering="optimize-speed" >
- </svg>
- </div>
- </div>
- </div>
- <div class="actions">
- <a class="yes btn1" id="adv-schedule-btn" href="#">Schedule</a>
- <a class="no simplemodal-close btn3" id="schedule-cancel-btn" href="#">Cancel</a>
- </div>
- </div>
- </div>
-
+
<div id="schedule-flow" class="modal">
<h3>Schedule Flow</h3>
<div id="errorMsg" class="box-error-message">$errorMsg</div>
@@ -345,7 +188,7 @@
<a class="yes btn3" id="login-btn" href="#">Re-login</a>
</div>
</div>
-
+#parse( "azkaban/webapp/servlet/velocity/scheduleoptionspanel.vm" )
#parse( "azkaban/webapp/servlet/velocity/executionoptionspanel.vm" )
@@ -375,6 +218,26 @@
</ul>
</ul>
+ <ul id="scheduleDisableJobMenu" class="contextMenu flowSubmenu">
+ <li class="openwindow"><a href="#scheduleOpenwindow">Open in New Window...</a></li>
+ <li id="scheduleDisable" class="disable separator"><a href="#disable">Disable</a><div id="scheduleDisableArrow" class="context-sub-icon"></div></li>
+ <ul id="scheduleDisableSub" class="subMenu">
+ <li class="disableAll"><a href="#disableAll">All</a></li>
+ <li class="parents"><a href="#disableParents">Parents</a></li>
+ <li class="ancestors"><a href="#disableAncestors">All Ancestors</a></li>
+ <li class="children"><a href="#disableChildren">Children</a></li>
+ <li class="decendents"><a href="#disableDescendents">All Descendents</a></li>
+ </ul>
+ <li id="scheduleEnable" class="enable"><a href="#enable">Enable</a> <div id="scheduleEnableArrow" class="context-sub-icon"></div></li>
+ <ul id="scheduleEnableSub" class="subMenu">
+ <li class="enableAll"><a href="#enableAll">All</a></li>
+ <li class="parents"><a href="#enableParents">Parents</a></li>
+ <li class="ancestors"><a href="#enableAncestors">All Ancestors</a></li>
+ <li class="children"><a href="#enableChildren">Children</a></li>
+ <li class="decendents"><a href="#enableDescendents">All Descendents</a></li>
+ </ul>
+ </ul>
+
</div>
</body>
</html>
diff --git a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
index b2511ed..64d5a9d 100644
--- a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
@@ -68,11 +68,11 @@
<!--th class="execid">Execution Id</th-->
<th>Flow</th>
<th>Project</th>
- <th>User</th>
<th>Submitted By</th>
<th class="date">First Scheduled to Run</th>
<th class="date">Next Execution Time</th>
<th class="date">Repeats Every</th>
+ <th>Has SLA</th>
<th colspan="2" class="action">Action</th>
</tr>
</thead>
@@ -88,10 +88,10 @@
<a href="${context}/manager?project=${sched.projectName}">${sched.projectName}</a>
</td>
<td>${sched.submitUser}</td>
- <td>${sched.submitUser}</td>
<td>$utils.formatDateTime(${sched.firstSchedTime})</td>
<td>$utils.formatDateTime(${sched.nextExecTime})</td>
<td>$utils.formatPeriod(${sched.period})</td>
+ <td>#if(${sched.slaOptions}) true #else false #end</td>
<td><button id="removeSchedBtn" onclick="removeSched(${sched.projectId}, '${sched.flowName}')" >Remove Schedule</button></td>
<td><button id="addSlaBtn" onclick="slaView.initFromSched(${sched.projectId}, '${sched.flowName}')" >Set SLA</button></td>
</tr>
@@ -103,54 +103,6 @@
</table>
</div>
-
- <div id="all-sla-content">
- <div class="section-hd">
- <h2>Flow SLAs</h2>
- </div>
- </div>
-
- <div class="scheduledFlows">
- <table id="slaTbl">
- <thead>
- <tr>
- <th>Flow</th>
- <th>Project</th>
- <th>User</th>
- <th>Submitted By</th>
- <th class="date">First SLA check Time</th>
- <th class="date">Next SLA check Time</th>
- <th class="date">Repeats Every</th>
- <th colspan="2" class="action">Action</th>
- </tr>
- </thead>
- <tbody>
- #if($slas)
-#foreach($sla in $slas)
- <tr class="row" >
-
- <td class="tb-name">
- <a href="${context}/manager?project=${sla.projectName}&flow=${sla.flowName}">${sla.flowName}</a>
- </td>
- <td>
- <a href="${context}/manager?project=${sla.projectName}">${sla.projectName}</a>
- </td>
- <td>${sla.submitUser}</td>
- <td>${sla.submitUser}</td>
- <td>$utils.formatDateTime(${sla.firstCheckTime})</td>
- <td>$utils.formatDateTime(${sla.nextCheckTime})</td>
- <td>$utils.formatPeriod(${sla.period})</td>
- <td><button id="removeSchedBtn" onclick="removeSla(${sla.projectId}, '${sla.flowName}')" >Remove SLA</button></td>
- </tr>
-#end
-#else
- <tr><td class="last">No Flow SLA Set</td></tr>
-#end
- </tbody>
- </table>
- </div>
- </div>
-
<!-- modal content -->
<div id="slaModalBackground" class="modalBackground2">
@@ -163,7 +115,7 @@
</ul>
</div>
<div class="optionsPane">
- <div id="slaPanel" class="generalPanel panel">
+ <div id="generalPanel" class="generalPanel panel">
<div id="slaActions">
<h4>SLA Alert Emails</h4>
<dl>
@@ -173,6 +125,7 @@
</dd>
</dl>
</div>
+ <br></br>
<div id="slaRules">
<h4>Flow SLA Rules</h4>
<div class="tableDiv">
@@ -180,36 +133,40 @@
<thead>
<tr>
<th>Flow/Job</th>
- <th>Finish In</th>
+ <th>Sla Rule</th>
+ <th>Duration</th>
<th>Email Action</th>
<th>Kill Action</th>
</tr>
</thead>
<tbody>
+ <tr id="addRow"><td id="addRow-col" colspan="5"><span class="addIcon"></span><a href="#">Add Row</a></td></tr>
</tbody>
</table>
</div>
- <h4>Job SLA Rules</h4>
- <div class="tableDiv">
+ <!--h4 style="visibility: hidden">Job SLA Rules</h4>
+ <div class="tableDiv" style="visibility: hidden">
<table id="jobRulesTbl">
<thead>
<tr>
- <th>Flow/Job</th>
+ <th>Flow/Job Id</th>
<th>Finish In</th>
+ <th>Duration</th>
<th>Email Action</th>
<th>Kill Action</th>
</tr>
</thead>
<tbody>
+ <tr id="addRow"><td id="addRow-col" colspan="4"><span class="addIcon"></span><a href="#">Add Row</a></td></tr>
</tbody>
</table>
- </div>
+ </div-->
</div>
</div>
</div>
<div class="actions">
- <a class="yes btn1" id="remove-sla-btn" href="#">Remove SLA</a>
- <a class="yes btn1" id="set-sla-btn" href="#">Set SLA</a>
+ <!--a class="yes btn1" id="remove-sla-btn" href="#">Remove SLA</a-->
+ <a class="yes btn1" id="set-sla-btn" href="#">Set/Change SLA</a>
<a class="no simplemodal-close btn3" id="sla-cancel-btn" href="#">Cancel</a>
</div>
</div>
diff --git a/src/java/azkaban/webapp/servlet/velocity/scheduleoptionspanel.vm b/src/java/azkaban/webapp/servlet/velocity/scheduleoptionspanel.vm
new file mode 100644
index 0000000..fc6bc91
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/velocity/scheduleoptionspanel.vm
@@ -0,0 +1,159 @@
+<div id="scheduleModalBackground" class="modalBackground2">
+ <div id="schedule-options" class="modal modalContainer2">
+ <a href='#' title='Close' class='modal-close'>x</a>
+ <h3>Schedule Flow Options</h3>
+ <div>
+ <ul class="optionsPicker">
+ <li id="scheduleGeneralOptions">General Options</li>
+ <li id="scheduleFlowOptions">Flow Options</li>
+ <!--li id="scheduleSlaOptions">SLA Options</li-->
+ </ul>
+ </div>
+ <div class="optionsPane">
+ <!--div id="scheduleSlaPanel" class="generalPanel panel">
+ <div id="slaActions">
+ <h4>SLA Alert Emails</h4>
+ <dl>
+ <dt >SLA Alert Emails</dt>
+ <dd>
+ <textarea id="slaEmails"></textarea>
+ </dd>
+ </dl>
+ </div>
+ <div id="slaRules">
+ <h4>Flow SLA Rules</h4>
+ <div class="tableDiv">
+ <table id="flowRulesTbl">
+ <thead>
+ <tr>
+ <th>Flow/Job</th>
+ <th>Finish In</th>
+ <th>Email Action</th>
+ <th>Kill Action</th>
+ </tr>
+ </thead>
+ <tbody>
+ </tbody>
+ </table>
+ </div>
+ <h4>Job SLA Rules</h4>
+ <div class="tableDiv">
+ <table id="jobRulesTbl">
+ <thead>
+ <tr>
+ <th>Flow/Job</th>
+ <th>Finish In</th>
+ <th>Email Action</th>
+ <th>Kill Action</th>
+ </tr>
+ </thead>
+ <tbody>
+ </tbody>
+ </table>
+ </div>
+ </div>
+ </div-->
+ <div id="scheduleGeneralPanel" class="generalPanel panel">
+ <div id="scheduleInfo">
+ <dl>
+ <dt>Schedule Time</dt>
+ <dd>
+ <input id="advhour" type="text" size="2" value="12"/>
+ <input id="advminutes" type="text" size="2" value="00"/>
+ <select id="advam_pm">
+ <option>pm</option>
+ <option>am</option>
+ </select>
+ <select id="advtimezone">
+ <option>PDT</option>
+ <option>UTC</option>
+ </select>
+ </dd>
+ <dt>Schedule Date</dt><dd><input type="text" id="advdatepicker" /></dd>
+ <dt>Recurrence</dt>
+ <dd>
+ <input id="advis_recurring" type="checkbox" checked />
+ <span>repeat every</span>
+ <input id="advperiod" type="text" size="2" value="1"/>
+ <select id="advperiod_units">
+ <option value="d">Days</option>
+ <option value="h">Hours</option>
+ <option value="m">Minutes</option>
+ <option value="M">Months</option>
+ <option value="w">Weeks</option>
+ </select>
+ </dd>
+ </dl>
+ </div>
+ <br></br>
+ <br></br>
+ <div id="scheduleCompleteActions">
+ <h4>Completion Actions</h4>
+ <dl>
+ <dt>Failure Action</dt>
+ <dd>
+ <select id="scheduleFailureAction" name="failureAction">
+ <option value="finishCurrent">Finish Current Running</option>
+ <option value="cancelImmediately">Cancel All</option>
+ <option value="finishPossible">Finish All Possible</option>
+ </select>
+ </dd>
+ <dt>Failure Email</dt>
+ <dd>
+ <textarea id="scheduleFailureEmails"></textarea>
+ </dd>
+ <dt>Notify on Failure</dt>
+ <dd>
+ <input id="scheduleNotifyFailureFirst" class="checkbox" type="checkbox" name="notify" value="first" checked >First Failure</input>
+ <input id="scheduleNotifyFailureLast" class="checkbox" type="checkbox" name="notify" value="last">Flow Stop</input>
+ </dd>
+ <dt>Success Email</dt>
+ <dd>
+ <textarea id="scheduleSuccessEmails"></textarea>
+ </dd>
+ <dt>Concurrent Execution</dt>
+ <dd id="scheduleExecutingJob" class="disabled">
+ <input id="scheduleIgnore" class="radio" type="radio" name="concurrent" value="ignore" checked /><label class="radioLabel" for="ignore">Run Concurrently</label>
+ <input id="schedulePipeline" class="radio" type="radio" name="concurrent" value="pipeline" /><label class="radioLabel" for="pipeline">Pipeline</label>
+ <input id="scheduleQueue" class="radio" type="radio" name="concurrent" value="queue" /><label class="radioLabel" for="queue">Queue Job</label>
+ </dd>
+ </dl>
+ </div>
+ <div id="scheduleFlowPropertyOverride">
+ <h4>Flow Property Override</h4>
+ <div class="tableDiv">
+ <table>
+ <thead>
+ <tr>
+ <th>Name</th>
+ <th>Value</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr id="scheduleAddRow"><td id="addRow-col" colspan="2"><span class="addIcon"></span><a href="#">Add Row</a></td></tr>
+ </tbody>
+ </table>
+ </div>
+ </div>
+ </div>
+ <div id="scheduleGraphPanel" class="graphPanel panel">
+ <div id="scheduleJobListCustom" class="jobList">
+ <div class="filterList">
+ <input class="filter" placeholder=" Job Filter" />
+ </div>
+ <div class="list">
+ </div>
+ <div class="btn5 resetPanZoomBtn" >Reset Pan Zoom</div>
+ </div>
+ <div id="scheduleSvgDivCustom" class="svgDiv" >
+ <svg class="svgGraph" xmlns="http://www.w3.org/2000/svg" version="1.1" shape-rendering="optimize-speed" text-rendering="optimize-speed" >
+ </svg>
+ </div>
+ </div>
+ </div>
+ <div class="actions">
+ <a class="yes btn1" id="adv-schedule-btn" href="#">Schedule</a>
+ <a class="no simplemodal-close btn3" id="schedule-cancel-btn" href="#">Cancel</a>
+ </div>
+ </div>
+</div>
\ No newline at end of file
src/sql/create_sla_table.sql 10(+10 -0)
diff --git a/src/sql/create_sla_table.sql b/src/sql/create_sla_table.sql
new file mode 100644
index 0000000..9914a03
--- /dev/null
+++ b/src/sql/create_sla_table.sql
@@ -0,0 +1,10 @@
+DROP TABLE if exists active_sla;
+CREATE TABLE active_sla (
+ exec_id INT NOT NULL,
+ job_name VARCHAR(128) NOT NULL,
+ check_time BIGINT NOT NULL,
+ rule TINYINT NOT NULL,
+ enc_type TINYINT,
+ options LONGBLOB NOT NULL,
+ primary key(exec_id, job_name, check_time, rule)
+) ENGINE=InnoDB;
src/web/css/azkaban.css 280(+275 -5)
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index a29f9a1..67d3287 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -1346,10 +1346,6 @@ tr:hover td {
bottom: 0px;
}
-#graphPanel {
- background-color: #F0F0F0;
-}
-
.radioLabel.disabled {
opacity: 0.3;
}
@@ -1443,6 +1439,189 @@ tr:hover td {
border-bottom: 1px solid #CCC;
}
+#sla-options {
+ left: 100px;
+ right: 100px;
+ top: 50px;
+ bottom: 40px;
+}
+
+#sla-options .svgDiv {
+ position: absolute;
+ background-color: #CCC;
+ padding: 1px;
+ left: 270px;
+ right: 0px;
+ top: 0px;
+ bottom: 0px;
+}
+
+#sla-options .jobList {
+ position: absolute;
+ width: 255px;
+ top: 0px;
+ bottom: 0px;
+ padding: 5px;
+ background-color: #F0F0F0;
+}
+
+#sla-options .list {
+ width: 255px;
+}
+
+#sla-options ul.optionsPicker {
+ margin-left: 30px;
+}
+
+#sla-options ul.optionsPicker li {
+ float: left;
+ font-size: 12pt;
+ font-weight: bold;
+ margin-right: 15px;
+ cursor: pointer;
+ color: #CCC;
+}
+
+#sla-options ul.optionsPicker li.selected {
+ text-decoration: underline;
+ color: #000;
+}
+
+#sla-options ul.optionsPicker li.selected:hover {
+ color: #000;
+}
+
+#sla-options ul.optionsPicker li:hover {
+ color: #888;
+}
+
+#sla-options .optionsPane {
+ position: absolute;
+ top: 85px;
+ background-color: #FFF;
+ left: 0px;
+ right: 0px;
+ bottom: 0px;
+}
+
+#sla-options .panel {
+ position: absolute;
+ width: 100%;
+ top: 0px;
+ bottom: 65px;
+}
+
+#sla-options .generalPanel.panel {
+ background-color: #F4F4F4;
+ padding-top: 15px;
+}
+
+#sla-options h3 {
+ margin-left: 20px;
+ font-size: 14pt;
+ border-bottom: 1px solid #CCC;
+}
+
+#sla-options h4 {
+ margin-left: 20px;
+ font-size: 12pt;
+ border-bottom: 1px solid #CCC;
+}
+
+
+#schedule-options {
+ left: 100px;
+ right: 100px;
+ top: 50px;
+ bottom: 40px;
+}
+
+#schedule-options .svgDiv {
+ position: absolute;
+ background-color: #CCC;
+ padding: 1px;
+ left: 270px;
+ right: 0px;
+ top: 0px;
+ bottom: 0px;
+}
+
+#schedule-options .jobList {
+ position: absolute;
+ width: 255px;
+ top: 0px;
+ bottom: 0px;
+ padding: 5px;
+ background-color: #F0F0F0;
+}
+
+#schedule-options .list {
+ width: 255px;
+}
+
+#schedule-options ul.optionsPicker {
+ margin-left: 30px;
+}
+
+#schedule-options ul.optionsPicker li {
+ float: left;
+ font-size: 12pt;
+ font-weight: bold;
+ margin-right: 15px;
+ cursor: pointer;
+ color: #CCC;
+}
+
+#schedule-options ul.optionsPicker li.selected {
+ text-decoration: underline;
+ color: #000;
+}
+
+#schedule-options ul.optionsPicker li.selected:hover {
+ color: #000;
+}
+
+#schedule-options ul.optionsPicker li:hover {
+ color: #888;
+}
+
+#schedule-options .optionsPane {
+ position: absolute;
+ top: 85px;
+ background-color: #FFF;
+ left: 0px;
+ right: 0px;
+ bottom: 0px;
+}
+
+#schedule-options .panel {
+ position: absolute;
+ width: 100%;
+ top: 0px;
+ bottom: 65px;
+}
+
+#schedule-options .generalPanel.panel {
+ background-color: #F4F4F4;
+ padding-top: 15px;
+}
+
+#schedule-options h3 {
+ margin-left: 20px;
+ font-size: 14pt;
+ border-bottom: 1px solid #CCC;
+}
+
+#schedule-options h4 {
+ margin-left: 20px;
+ font-size: 12pt;
+ border-bottom: 1px solid #CCC;
+}
+
+#graphPanel {
+ background-color: #F0F0F0;
+}
+
#generalPanel {
overflow: auto;
}
@@ -1548,6 +1727,97 @@ tr:hover td {
top: 100px;
}
+#scheduleGraphPanel {
+ background-color: #F0F0F0;
+}
+
+#scheduleGeneralPanel {
+ overflow: auto;
+}
+
+#scheduleGeneralPanel dt {
+ width: 150px;
+ font-size: 10pt;
+ font-weight: bold;
+ margin-top: 5px;
+}
+
+#scheduleGeneralPanel textarea {
+ width: 500px;
+}
+
+#scheduleGeneralPanel table #addRow {
+ cursor: pointer;
+}
+
+#scheduleGeneralPanel table tr {
+ height: 24px;
+}
+
+#scheduleGeneralPanel table .editable {
+
+}
+
+#scheduleGeneralPanel table .editable input {
+ border: 1px solid #009FC9;
+ height: 16px;
+}
+
+#scheduleGeneralPanel table .name {
+ width: 40%;
+}
+
+#scheduleGeneralPanel span.addIcon {
+ display: block;
+ width: 16px;
+ height: 16px;
+ background-image: url("./images/addIcon.png");
+}
+
+#scheduleGeneralPanel span.removeIcon {
+ display: block;
+ visibility:hidden;
+ disabled: true;
+ width: 16px;
+ height: 16px;
+ background-image: url("./images/removeIcon.png");
+ cursor: pointer;
+}
+
+#scheduleGeneralPanel .editable:hover span.removeIcon {
+ visibility:visible;
+}
+
+#scheduleGeneralPanel {
+}
+
+#scheduleGeneralPanel span {
+ float: left;
+ margin-left: 5px;
+}
+
+#scheduleGeneralPanel dd {
+ font-size: 10pt;
+}
+
+#scheduleFlowPropertyOverride {
+ clear: both;
+ padding-top: 30px;
+}
+
+#scheduleFlowPropertyOverride .tableDiv {
+ padding-right: 20px;
+ padding-left: 20px;
+}
+
+#scheduleJobList {
+ position: absolute;
+ top: 0px;
+ left: 0px;
+ height: 100%;
+ width: 250px;
+}
+
.filter {
width: 100%;
}
@@ -1577,7 +1847,7 @@ tr:hover td {
background-position: 16px 0px;
}
-.list ul li:hover{
+.list ul li:hover {
background-color: #E1E3E2;
color: #009FC9;
}
src/web/js/azkaban.exflow.options.view.js 453(+16 -437)
diff --git a/src/web/js/azkaban.exflow.options.view.js b/src/web/js/azkaban.exflow.options.view.js
index 2a036d8..6c631bb 100644
--- a/src/web/js/azkaban.exflow.options.view.js
+++ b/src/web/js/azkaban.exflow.options.view.js
@@ -15,11 +15,9 @@
*/
var executeFlowView;
-var advancedScheduleView;
var customSvgGraphView;
var customJobListView;
var cloneModel;
-var scheduleModel;
function recurseAllAncestors(nodes, disabledMap, id, disable) {
var node = nodes[id];
@@ -88,425 +86,6 @@ azkaban.ContextMenu = Backbone.View.extend({
}
});
-azkaban.AdvancedScheduleView = Backbone.View.extend({
- events : {
- "click" : "closeEditingTarget",
- "click #adv-schedule-btn": "handleScheduleFlow",
- "click #schedule-cancel-btn": "handleCancel",
- "click .modal-close": "handleCancel",
- "click #scheduleGeneralOptions": "handleGeneralOptionsSelect",
- "click #scheduleFlowOptions": "handleFlowOptionsSelect",
- "click #scheduleSlaOptions": "handleSlaOptionsSelect",
- "click #scheduleAddRow": "handleAddRow",
- "click table .editable": "handleEditColumn",
- "click table .removeIcon": "handleRemoveColumn"
- },
- initialize: function(setting) {
- this.contextMenu = new azkaban.ContextMenu({el:$('#disableJobMenu')});
- this.handleGeneralOptionsSelect();
- },
- show: function() {
- this.handleGeneralOptionsSelect();
- $('#advScheduleModalBackground').show();
- $('#schedule-options').show();
- this.cloneModel = this.model.clone();
- scheduleModel = this.cloneModel;
-
- var fetchData = {"project": projectName, "ajax":"flowInfo", "flow":flowName};
-// if (execId) {
-// fetchData.execid = execId;
-// }
- this.executeURL = contextURL + "/executor";
- this.scheduleURL = contextURL + "/schedule";
- var handleAddRow = this.handleAddRow;
-
- var data = this.cloneModel.get("data");
- var nodes = {};
- for (var i=0; i < data.nodes.length; ++i) {
- var node = data.nodes[i];
- nodes[node.id] = node;
- }
-
- for (var i=0; i < data.edges.length; ++i) {
- var edge = data.edges[i];
- var fromNode = nodes[edge.from];
- var toNode = nodes[edge.target];
-
- if (!fromNode.outNodes) {
- fromNode.outNodes = {};
- }
- fromNode.outNodes[toNode.id] = toNode;
-
- if (!toNode.inNodes) {
- toNode.inNodes = {};
- }
- toNode.inNodes[fromNode.id] = fromNode;
- }
- this.cloneModel.set({nodes: nodes});
-
- var disabled = {};
-// for (var i = 0; i < data.nodes.length; ++i) {
-// var updateNode = data.nodes[i];
-// if (updateNode.status == "DISABLED" || updateNode.status == "SKIPPED") {
-// updateNode.status = "READY";
-// disabled[updateNode.id] = true;
-// }
-// if (updateNode.status == "SUCCEEDED") {
-// disabled[updateNode.id] = true;
-// }
-// }
- this.cloneModel.set({disabled: disabled});
-
- $.get(
- this.executeURL,
- fetchData,
- function(data) {
- if (data.error) {
- alert(data.error);
- }
- else {
- if (data.successEmails) {
- $('#scheduleSuccessEmails').val(data.successEmails.join());
- }
- if (data.failureEmails) {
- $('#scheduleFailureEmails').val(data.failureEmails.join());
- }
-
- if (data.failureAction) {
- $('#scheduleFailureAction').val(data.failureAction);
- }
- if (data.notifyFailureFirst) {
- $('#scheduleNotifyFailureFirst').attr('checked', true);
- }
- if (data.notifyFailureLast) {
- $('#scheduleNotifyFailureLast').attr('checked', true);
- }
- if (data.flowParam) {
- var flowParam = data.flowParam;
- for (var key in flowParam) {
- var row = handleAddRow();
- var td = $(row).find('td');
- $(td[0]).text(key);
- $(td[1]).text(flowParam[key]);
- }
- }
-
- if (!data.running || data.running.length == 0) {
- $(".radio").attr("disabled", "disabled");
- $(".radioLabel").addClass("disabled", "disabled");
- }
- }
- },
- "json"
- );
- },
- handleCancel: function(evt) {
- var scheduleURL = contextURL + "/schedule";
- $('#advScheduleModalBackground').hide();
- $('#schedule-options').hide();
- },
- handleGeneralOptionsSelect: function(evt) {
- $('#scheduleFlowOptions').removeClass('selected');
- $('#scheduleSlaOptions').removeClass('selected');
- $('#scheduleGeneralOptions').addClass('selected');
-
- $('#scheduleGeneralPanel').show();
- $('#scheduleGraphPanel').hide();
- $('#scheduleSlaPanel').hide();
- },
- handleSlaOptionsSelect: function(evt) {
- $('#scheduleFlowOptions').removeClass('selected');
- $('#scheduleSlaOptions').addClass('selected');
- $('#scheduleGeneralOptions').removeClass('selected');
-
- $('#scheduleSlaPanel').show();
- $('#scheduleGraphPanel').hide();
- $('#scheduleGeneralPanel').hide();
- },
- handleFlowOptionsSelect: function(evt) {
- $('#scheduleGeneralOptions').removeClass('selected');
- $('#scheduleFlowOptions').addClass('selected');
- $('#scheduleSlaOptions').removeClass('selected');
-
- $('#scheduleGraphPanel').show();
- $('#scheduleGeneralPanel').hide();
- $('#scheduleSlaPanel').hide();
-
- if (this.flowSetup) {
- return;
- }
-
- customSvgGraphView = new azkaban.SvgGraphView({el:$('#scheduleSvgDivCustom'), model: this.cloneModel, rightClick: {id: 'disableJobMenu', callback: this.handleDisableMenuClick}});
- customJobsListView = new azkaban.JobListView({el:$('#scheduleJobListCustom'), model: this.cloneModel, rightClick: {id: 'disableJobMenu', callback: this.handleDisableMenuClick}});
- this.cloneModel.trigger("change:graph");
-
- this.flowSetup = true;
- },
- handleScheduleFlow: function(evt) {
- var scheduleURL = contextURL + "/scheduler";
- var disabled = this.cloneModel.get("disabled");
- var failureAction = $('#failureAction').val();
- var failureEmails = $('#failureEmails').val();
- var successEmails = $('#successEmails').val();
- var notifyFailureFirst = $('#notifyFailureFirst').is(':checked');
- var notifyFailureLast = $('#notifyFailureLast').is(':checked');
- var executingJobOption = $('input:radio[name=gender]:checked').val();
-
- var flowOverride = {};
- var editRows = $(".editRow");
- for (var i = 0; i < editRows.length; ++i) {
- var row = editRows[i];
- var td = $(row).find('td');
- var key = $(td[0]).text();
- var val = $(td[1]).text();
-
- if (key && key.length > 0) {
- flowOverride[key] = val;
- }
- }
-
- var scheduleData = {
- project: projectName,
- ajax: "advScheduleFlow",
- flow: flowName,
- hour: $('#advHour').val(),
- min: $('#advMinutes').val(),
- am_pm: $('#advAm_pm').val(),
- timezone: $('#advTimezone').val(),
- date: $('#advDate').val(),
- period: $('#advPeriod').val()+$('#advPeriod_units').val(),
- disable: this.cloneModel.get('disabled'),
- failureAction: failureAction,
- failureEmails: failureEmails,
- successEmails: successEmails,
- notifyFailureFirst: notifyFailureFirst,
- notifyFailureLast: notifyFailureLast,
- executingJobOption: executingJobOption,
- flowOverride: flowOverride
- };
-
- $.get(
- scheduleURL,
- scheduleData,
- function(data) {
- if (data.error) {
- alert(data.error);
- }
- else {
- var redirectURL = contextURL + "/schedule";
- window.location.href = redirectURL;
- }
- },
- "json"
- );
- },
- handleAddRow: function(evt) {
- var tr = document.createElement("tr");
- var tdName = document.createElement("td");
- var tdValue = document.createElement("td");
-
- var icon = document.createElement("span");
- $(icon).addClass("removeIcon");
- var nameData = document.createElement("span");
- $(nameData).addClass("spanValue");
- var valueData = document.createElement("span");
- $(valueData).addClass("spanValue");
-
- $(tdName).append(icon);
- $(tdName).append(nameData);
- $(tdName).addClass("name");
- $(tdName).addClass("editable");
-
- $(tdValue).append(valueData);
- $(tdValue).addClass("editable");
-
- $(tr).addClass("editRow");
- $(tr).append(tdName);
- $(tr).append(tdValue);
-
- $(tr).insertBefore("#scheduleAddRow");
- return tr;
- },
- handleEditColumn : function(evt) {
- var curTarget = evt.currentTarget;
-
- if (this.editingTarget != curTarget) {
- this.closeEditingTarget();
-
- var text = $(curTarget).children(".spanValue").text();
- $(curTarget).empty();
-
- var input = document.createElement("input");
- $(input).attr("type", "text");
- $(input).css("width", "100%");
- $(input).val(text);
- $(curTarget).addClass("editing");
- $(curTarget).append(input);
- $(input).focus();
- this.editingTarget = curTarget;
- }
- },
- handleRemoveColumn : function(evt) {
- var curTarget = evt.currentTarget;
- // Should be the table
- var row = curTarget.parentElement.parentElement;
- $(row).remove();
- },
- closeEditingTarget: function(evt) {
- if (this.editingTarget != null && this.editingTarget != evt.target && this.editingTarget != evt.target.parentElement ) {
- var input = $(this.editingTarget).children("input")[0];
- var text = $(input).val();
- $(input).remove();
-
- var valueData = document.createElement("span");
- $(valueData).addClass("spanValue");
- $(valueData).text(text);
-
- if ($(this.editingTarget).hasClass("name")) {
- var icon = document.createElement("span");
- $(icon).addClass("removeIcon");
- $(this.editingTarget).append(icon);
- }
-
- $(this.editingTarget).removeClass("editing");
- $(this.editingTarget).append(valueData);
- this.editingTarget = null;
- }
- },
- handleDisableMenuClick : function(action, el, pos) {
- var jobid = el[0].jobid;
- var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowName + "&job=" + jobid;
- if (action == "open") {
- window.location.href = requestURL;
- }
- else if(action == "openwindow") {
- window.open(requestURL);
- }
- else if(action == "disable") {
- var disabled = scheduleModel.get("disabled");
-
- disabled[jobid] = true;
- scheduleModel.set({disabled: disabled});
- scheduleModel.trigger("change:disabled");
- }
- else if(action == "disableAll") {
- var disabled = scheduleModel.get("disabled");
-
- var nodes = scheduleModel.get("nodes");
- for (var key in nodes) {
- disabled[key] = true;
- }
-
- scheduleModel.set({disabled: disabled});
- scheduleModel.trigger("change:disabled");
- }
- else if (action == "disableParents") {
- var disabled = scheduleModel.get("disabled");
- var nodes = scheduleModel.get("nodes");
- var inNodes = nodes[jobid].inNodes;
-
- if (inNodes) {
- for (var key in inNodes) {
- disabled[key] = true;
- }
- }
-
- scheduleModel.set({disabled: disabled});
- scheduleModel.trigger("change:disabled");
- }
- else if (action == "disableChildren") {
- var disabledMap = scheduleModel.get("disabled");
- var nodes = scheduleModel.get("nodes");
- var outNodes = nodes[jobid].outNodes;
-
- if (outNodes) {
- for (var key in outNodes) {
- disabledMap[key] = true;
- }
- }
-
- scheduleModel.set({disabled: disabledMap});
- scheduleModel.trigger("change:disabled");
- }
- else if (action == "disableAncestors") {
- var disabled = scheduleModel.get("disabled");
- var nodes = scheduleModel.get("nodes");
-
- recurseAllAncestors(nodes, disabled, jobid, true);
-
- scheduleModel.set({disabled: disabled});
- scheduleModel.trigger("change:disabled");
- }
- else if (action == "disableDescendents") {
- var disabled = scheduleModel.get("disabled");
- var nodes = scheduleModel.get("nodes");
-
- recurseAllDescendents(nodes, disabled, jobid, true);
-
- scheduleModel.set({disabled: disabled});
- scheduleModel.trigger("change:disabled");
- }
- else if(action == "enable") {
- var disabled = scheduleModel.get("disabled");
-
- disabled[jobid] = false;
- scheduleModel.set({disabled: disabled});
- scheduleModel.trigger("change:disabled");
- }
- else if(action == "enableAll") {
- disabled = {};
- scheduleModel.set({disabled: disabled});
- scheduleModel.trigger("change:disabled");
- }
- else if (action == "enableParents") {
- var disabled = scheduleModel.get("disabled");
- var nodes = scheduleModel.get("nodes");
- var inNodes = nodes[jobid].inNodes;
-
- if (inNodes) {
- for (var key in inNodes) {
- disabled[key] = false;
- }
- }
-
- scheduleModel.set({disabled: disabled});
- scheduleModel.trigger("change:disabled");
- }
- else if (action == "enableChildren") {
- var disabled = scheduleModel.get("disabled");
- var nodes = scheduleModel.get("nodes");
- var outNodes = nodes[jobid].outNodes;
-
- if (outNodes) {
- for (var key in outNodes) {
- disabled[key] = false;
- }
- }
-
- scheduleModel.set({disabled: disabled});
- scheduleModel.trigger("change:disabled");
- }
- else if (action == "enableAncestors") {
- var disabled = scheduleModel.get("disabled");
- var nodes = scheduleModel.get("nodes");
-
- recurseAllAncestors(nodes, disabled, jobid, false);
-
- this.cloneModel.set({disabled: disabled});
- this.cloneModel.trigger("change:disabled");
- }
- else if (action == "enableDescendents") {
- var disabled = this.cloneModel.get("disabled");
- var nodes = this.cloneModel.get("nodes");
-
- recurseAllDescendents(nodes, disabled, jobid, false);
-
- scheduleModel.set({disabled: disabled});
- scheduleModel.trigger("change:disabled");
- }
- }
-});
-
azkaban.ExecuteFlowView = Backbone.View.extend({
events : {
"click" : "closeEditingTarget",
@@ -668,21 +247,21 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
flowOverride[key] = val;
}
}
-
- var executingData = {
- project: projectName,
- ajax: "executeFlow",
- flow: flowName,
- disable: this.cloneModel.get('disabled'),
- failureAction: failureAction,
- failureEmails: failureEmails,
- successEmails: successEmails,
- notifyFailureFirst: notifyFailureFirst,
- notifyFailureLast: notifyFailureLast,
- executingJobOption: executingJobOption,
- flowOverride: flowOverride
- };
-
+
+ var executingData = {
+ project: projectName,
+ ajax: "executeFlow",
+ flow: flowName,
+ disable: this.cloneModel.get('disabled'),
+ failureAction: failureAction,
+ failureEmails: failureEmails,
+ successEmails: successEmails,
+ notifyFailureFirst: notifyFailureFirst,
+ notifyFailureLast: notifyFailureLast,
+ executingJobOption: executingJobOption,
+ flowOverride: flowOverride
+ };
+
executeFlow(executingData);
},
handleAddRow: function(evt) {
@@ -729,7 +308,7 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
$(curTarget).append(input);
$(input).focus();
this.editingTarget = curTarget;
- }
+ }
},
handleRemoveColumn : function(evt) {
var curTarget = evt.currentTarget;
src/web/js/azkaban.flow.view.js 85(+2 -83)
diff --git a/src/web/js/azkaban.flow.view.js b/src/web/js/azkaban.flow.view.js
index 7c05e77..584e3a4 100644
--- a/src/web/js/azkaban.flow.view.js
+++ b/src/web/js/azkaban.flow.view.js
@@ -280,88 +280,6 @@ azkaban.GraphModel = Backbone.Model.extend({});
var executionModel;
azkaban.ExecutionModel = Backbone.Model.extend({});
-var scheduleFlowView;
-azkaban.ScheduleFlowView = Backbone.View.extend({
- events : {
- "click #schedule-btn": "handleScheduleFlow",
- "click #adv-schedule-opt-btn": "handleAdvancedSchedule"
- },
- initialize : function(settings) {
- $( "#datepicker" ).datepicker();
- $( "#datepicker" ).datepicker('setDate', new Date());
- $("#errorMsg").hide();
- },
- handleAdvancedSchedule : function(evt) {
- console.log("Clicked advanced schedule options button");
- //$('#confirm-container').hide();
- $.modal.close();
- advancedScheduleView.show();
- },
- show: function() {
-// this.cloneModel = this.model.clone();
-// cloneModel = this.cloneModel;
- },
- handleScheduleFlow : function(evt) {
-
- var hourVal = $('#hour').val();
- var minutesVal = $('#minutes').val();
- var ampmVal = $('#am_pm').val();
- var timezoneVal = $('#timezone').val();
- var dateVal = $('#datepicker').val();
- var is_recurringVal = $('#is_recurring').val();
- var periodVal = $('#period').val();
- var periodUnits = $('#period_units').val();
-
- console.log("Creating schedule for "+projectName+"."+flowName);
- $.ajax({
- async: "false",
- url: "schedule",
- dataType: "json",
- type: "POST",
- data: {
- action:"scheduleFlow",
- projectId:projectId,
- projectName:projectName,
- flowName:flowName,
- hour:hourVal,
- minutes:minutesVal,
- am_pm:ampmVal,
- timezone:timezoneVal,
- date:dateVal,
- userExec:"dummy",
- is_recurring:is_recurringVal,
- period:periodVal,
- period_units:periodUnits
- },
- success: function(data) {
- if (data.status == "success") {
- console.log("Successfully scheduled for "+projectName+"."+flowName);
- if (data.action == "redirect") {
- window.location = contextURL + "/manager?project=" + projectName + "&flow=" + flowName ;
- }
- else{
- $("#success_message").text("Flow " + projectName + "." + flowName + " scheduled!" );
- window.location = contextURL + "/manager?project=" + projectName + "&flow=" + flowName ;
- }
- }
- else {
- if (data.action == "login") {
- window.location = "";
- }
- else {
- $("#errorMsg").text("ERROR: " + data.message);
- $("#errorMsg").slideDown("fast");
- }
- }
- }
- });
-
- },
- render: function() {
- }
-});
-
-
$(function() {
var selected;
// Execution model has to be created before the window switches the tabs.
@@ -374,8 +292,9 @@ $(function() {
svgGraphView = new azkaban.SvgGraphView({el:$('#svgDiv'), model: graphModel, rightClick: {id: 'jobMenu', callback: handleJobMenuClick}});
jobsListView = new azkaban.JobListView({el:$('#jobList'), model: graphModel, rightClick: {id: 'jobMenu', callback: handleJobMenuClick}});
scheduleFlowView = new azkaban.ScheduleFlowView({el:$('#schedule-flow'), model: graphModel});
- executeFlowView = new azkaban.ExecuteFlowView({el:$('#executing-options'), model: graphModel});
advancedScheduleView = new azkaban.AdvancedScheduleView({el:$('#schedule-options'), model: graphModel});
+ executeFlowView = new azkaban.ExecuteFlowView({el:$('#executing-options'), model: graphModel});
+
var requestURL = contextURL + "/manager";
// Set up the Flow options view. Create a new one every time :p
src/web/js/azkaban.schedule.options.view.js 586(+586 -0)
diff --git a/src/web/js/azkaban.schedule.options.view.js b/src/web/js/azkaban.schedule.options.view.js
new file mode 100644
index 0000000..5b9b760
--- /dev/null
+++ b/src/web/js/azkaban.schedule.options.view.js
@@ -0,0 +1,586 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+$.namespace('azkaban');
+
+var scheduleCustomSvgGraphView;
+var scheduleCustomJobListView;
+
+var scheduleFlowView;
+
+var scheduleFlowData;
+
+//function recurseAllAncestors(nodes, disabledMap, id, disable) {
+// var node = nodes[id];
+//
+// if (node.inNodes) {
+// for (var key in node.inNodes) {
+// disabledMap[key] = disable;
+// recurseAllAncestors(nodes, disabledMap, key, disable);
+// }
+// }
+//}
+//
+//function recurseAllDescendents(nodes, disabledMap, id, disable) {
+// var node = nodes[id];
+//
+// if (node.outNodes) {
+// for (var key in node.outNodes) {
+// disabledMap[key] = disable;
+// recurseAllDescendents(nodes, disabledMap, key, disable);
+// }
+// }
+//}
+//
+azkaban.ScheduleContextMenu = Backbone.View.extend({
+ events : {
+ "click #scheduleDisableArrow" : "handleDisabledClick",
+ "click #scheduleEnableArrow" : "handleEnabledClick"
+ },
+ initialize: function(settings) {
+ $('#scheduleDisableSub').hide();
+ $('#scheduleEnableSub').hide();
+ },
+ handleEnabledClick: function(evt) {
+ if(evt.stopPropagation) {
+ evt.stopPropagation();
+ }
+ evt.cancelBubble=true;
+
+ if (evt.currentTarget.expanded) {
+ evt.currentTarget.expanded=false;
+ $('#scheduleEnableArrow').removeClass('collapse');
+ $('#scheduleEnableSub').hide();
+ }
+ else {
+ evt.currentTarget.expanded=true;
+ $('#scheduleEnableArrow').addClass('collapse');
+ $('#scheduleEnableSub').show();
+ }
+ },
+ handleDisabledClick: function(evt) {
+ if(evt.stopPropagation) {
+ evt.stopPropagation();
+ }
+ evt.cancelBubble=true;
+
+ if (evt.currentTarget.expanded) {
+ evt.currentTarget.expanded=false;
+ $('#scheduleDisableArrow').removeClass('collapse');
+ $('#scheduleDisableSub').hide();
+ }
+ else {
+ evt.currentTarget.expanded=true;
+ $('#scheduleDisableArrow').addClass('collapse');
+ $('#scheduleDisableSub').show();
+ }
+ }
+});
+
+azkaban.ScheduleFlowView = Backbone.View.extend({
+ events : {
+ "click #schedule-btn": "handleScheduleFlow",
+ "click #adv-schedule-opt-btn": "handleAdvancedSchedule"
+ },
+ initialize : function(settings) {
+ $( "#datepicker" ).datepicker();
+ $( "#datepicker" ).datepicker('setDate', new Date());
+ $("#errorMsg").hide();
+ },
+ handleAdvancedSchedule : function(evt) {
+ console.log("Clicked advanced schedule options button");
+ //$('#confirm-container').hide();
+ $.modal.close();
+ advancedScheduleView.show();
+ },
+ handleScheduleFlow : function(evt) {
+
+ var hourVal = $('#hour').val();
+ var minutesVal = $('#minutes').val();
+ var ampmVal = $('#am_pm').val();
+ var timezoneVal = $('#timezone').val();
+ var dateVal = $('#datepicker').val();
+ var is_recurringVal = $('#is_recurring').val();
+ var periodVal = $('#period').val();
+ var periodUnits = $('#period_units').val();
+
+ console.log("Creating schedule for "+projectName+"."+flowName);
+ $.ajax({
+ async: "false",
+ url: "schedule",
+ dataType: "json",
+ type: "POST",
+ data: {
+ action:"scheduleFlow",
+ projectId:projectId,
+ projectName:projectName,
+ flowName:flowName,
+ hour:hourVal,
+ minutes:minutesVal,
+ am_pm:ampmVal,
+ timezone:timezoneVal,
+ date:dateVal,
+ userExec:"dummy",
+ is_recurring:is_recurringVal,
+ period:periodVal,
+ period_units:periodUnits
+ },
+ success: function(data) {
+ if (data.status == "success") {
+ console.log("Successfully scheduled for "+projectName+"."+flowName);
+ if (data.action == "redirect") {
+ window.location = contextURL + "/manager?project=" + projectName + "&flow=" + flowName ;
+ }
+ else{
+ $("#success_message").text("Flow " + projectName + "." + flowName + " scheduled!" );
+ window.location = contextURL + "/manager?project=" + projectName + "&flow=" + flowName ;
+ }
+ }
+ else {
+ if (data.action == "login") {
+ window.location = "";
+ }
+ else {
+ $("#errorMsg").text("ERROR: " + data.message);
+ $("#errorMsg").slideDown("fast");
+ }
+ }
+ }
+ });
+
+ },
+ render: function() {
+ }
+});
+
+azkaban.AdvancedScheduleView = Backbone.View.extend({
+ events : {
+ "click" : "closeEditingTarget",
+ "click #adv-schedule-btn": "handleAdvSchedule",
+ "click #schedule-cancel-btn": "handleCancel",
+ "click .modal-close": "handleCancel",
+ "click #scheduleGeneralOptions": "handleGeneralOptionsSelect",
+ "click #scheduleFlowOptions": "handleFlowOptionsSelect",
+ "click #scheduleAddRow": "handleAddRow",
+ "click table .editable": "handleEditColumn",
+ "click table .removeIcon": "handleRemoveColumn"
+ },
+ initialize: function(setting) {
+ this.contextMenu = new azkaban.ScheduleContextMenu({el:$('#scheduleDisableJobMenu')});
+ this.handleGeneralOptionsSelect();
+ $( "#advdatepicker" ).datepicker();
+ $( "#advdatepicker" ).datepicker('setDate', new Date());
+ },
+ show: function() {
+ $('#scheduleModalBackground').show();
+ $('#schedule-options').show();
+ this.handleGeneralOptionsSelect();
+
+ scheduleFlowData = this.model.clone();
+ this.flowData = scheduleFlowData;
+ var flowData = scheduleFlowData;
+
+ var fetchData = {"project": projectName, "ajax":"flowInfo", "flow":flowName};
+
+ var executeURL = contextURL + "/executor";
+ this.executeURL = executeURL;
+ var scheduleURL = contextURL + "/schedule";
+ this.scheduleURL = scheduleURL;
+ var handleAddRow = this.handleAddRow;
+
+ var data = flowData.get("data");
+ var nodes = {};
+ for (var i=0; i < data.nodes.length; ++i) {
+ var node = data.nodes[i];
+ nodes[node.id] = node;
+ }
+
+ for (var i=0; i < data.edges.length; ++i) {
+ var edge = data.edges[i];
+ var fromNode = nodes[edge.from];
+ var toNode = nodes[edge.target];
+
+ if (!fromNode.outNodes) {
+ fromNode.outNodes = {};
+ }
+ fromNode.outNodes[toNode.id] = toNode;
+
+ if (!toNode.inNodes) {
+ toNode.inNodes = {};
+ }
+ toNode.inNodes[fromNode.id] = fromNode;
+ }
+ flowData.set({nodes: nodes});
+
+ var disabled = {};
+ for (var i = 0; i < data.nodes.length; ++i) {
+ var updateNode = data.nodes[i];
+ if (updateNode.status == "DISABLED" || updateNode.status == "SKIPPED") {
+ updateNode.status = "READY";
+ disabled[updateNode.id] = true;
+ }
+ if (updateNode.status == "SUCCEEDED" || updateNode.status=="RUNNING") {
+ disabled[updateNode.id] = true;
+ }
+ }
+ flowData.set({disabled: disabled});
+
+ $.get(
+ executeURL,
+ fetchData,
+ function(data) {
+ if (data.error) {
+ alert(data.error);
+ }
+ else {
+ if (data.successEmails) {
+ $('#scheduleSuccessEmails').val(data.successEmails.join());
+ }
+ if (data.failureEmails) {
+ $('#scheduleFailureEmails').val(data.failureEmails.join());
+ }
+
+ if (data.failureAction) {
+ $('#scheduleFailureAction').val(data.failureAction);
+ }
+ if (data.notifyFailureFirst) {
+ $('#scheduleNotifyFailureFirst').attr('checked', true);
+ }
+ if (data.notifyFailureLast) {
+ $('#scheduleNotifyFailureLast').attr('checked', true);
+ }
+ if (data.flowParam) {
+ var flowParam = data.flowParam;
+ for (var key in flowParam) {
+ var row = handleAddRow();
+ var td = $(row).find('td');
+ $(td[0]).text(key);
+ $(td[1]).text(flowParam[key]);
+ }
+ }
+
+ if (!data.running || data.running.length == 0) {
+ $(".radio").attr("disabled", "disabled");
+ $(".radioLabel").addClass("disabled", "disabled");
+ }
+ }
+ },
+ "json"
+ );
+ },
+ handleCancel: function(evt) {
+ $('#scheduleModalBackground').hide();
+ $('#schedule-options').hide();
+ },
+ handleGeneralOptionsSelect: function(evt) {
+ $('#scheduleFlowOptions').removeClass('selected');
+ $('#scheduleGeneralOptions').addClass('selected');
+
+ $('#scheduleGeneralPanel').show();
+ $('#scheduleGraphPanel').hide();
+ },
+ handleFlowOptionsSelect: function(evt) {
+ $('#scheduleGeneralOptions').removeClass('selected');
+ $('#scheduleFlowOptions').addClass('selected');
+
+ $('#scheduleGraphPanel').show();
+ $('#scheduleGeneralPanel').hide();
+
+ if (this.flowSetup) {
+ return;
+ }
+
+ scheduleCustomSvgGraphView = new azkaban.SvgGraphView({el:$('#scheduleSvgDivCustom'), model: scheduleFlowData, rightClick: {id: 'scheduleDisableJobMenu', callback: this.handleDisableMenuClick}});
+ scheduleCustomJobsListView = new azkaban.JobListView({el:$('#scheduleJobListCustom'), model: scheduleFlowData, rightClick: {id: 'scheduleDisableJobMenu', callback: this.handleDisableMenuClick}});
+ scheduleFlowData.trigger("change:graph");
+
+ this.flowSetup = true;
+ },
+ handleAdvSchedule: function(evt) {
+ var scheduleURL = this.scheduleURL;
+ var disabled = this.flowData.get("disabled");
+ var disabledJobs = "";
+ for(var job in disabled) {
+ if(disabled[job] == true) {
+ disabledJobs += "," + job;
+ }
+ }
+ var failureAction = $('#scheduleFailureAction').val();
+ var failureEmails = $('#scheduleFailureEmails').val();
+ var successEmails = $('#scheduleSuccessEmails').val();
+ var notifyFailureFirst = $('#scheduleNotifyFailureFirst').is(':checked');
+ var notifyFailureLast = $('#scheduleNotifyFailureLast').is(':checked');
+ var executingJobOption = $('input:radio[name=gender]:checked').val();
+
+
+ var scheduleTime = $('#advhour').val() + "," + $('#advminutes').val() + "," + $('#advam_pm').val() + "," + $('#advtimezone').val();
+ var scheduleDate = $('#advdatepicker').val();
+ var is_recurring = $('#advis_recurring').val();
+ var period = $('#advperiod').val() + $('#advperiod_units').val();
+
+ var flowOverride = {};
+ var editRows = $(".editRow");
+ for (var i = 0; i < editRows.length; ++i) {
+ var row = editRows[i];
+ var td = $(row).find('td');
+ var key = $(td[0]).text();
+ var val = $(td[1]).text();
+
+ if (key && key.length > 0) {
+ flowOverride[key] = val;
+ }
+ }
+
+ var scheduleData = {
+ projectId:projectId,
+ projectName: projectName,
+ ajax: "advSchedule",
+ flowName: flowName,
+ scheduleTime: scheduleTime,
+ scheduleDate: scheduleDate,
+ is_recurring: is_recurring,
+ period: period,
+ disabledJobs: disabledJobs,
+ failureAction: failureAction,
+ failureEmails: failureEmails,
+ successEmails: successEmails,
+ notifyFailureFirst: notifyFailureFirst,
+ notifyFailureLast: notifyFailureLast,
+ executingJobOption: executingJobOption,
+ flowOverride: flowOverride
+ };
+
+ $.post(
+ scheduleURL,
+ scheduleData,
+ function(data) {
+ if (data.error) {
+ alert(data.error);
+ }
+ else {
+ window.location = scheduleURL;
+ }
+ },
+ "json"
+ )
+ },
+ handleAddRow: function(evt) {
+ var tr = document.createElement("tr");
+ var tdName = document.createElement("td");
+ var tdValue = document.createElement("td");
+
+ var icon = document.createElement("span");
+ $(icon).addClass("removeIcon");
+ var nameData = document.createElement("span");
+ $(nameData).addClass("spanValue");
+ var valueData = document.createElement("span");
+ $(valueData).addClass("spanValue");
+
+ $(tdName).append(icon);
+ $(tdName).append(nameData);
+ $(tdName).addClass("name");
+ $(tdName).addClass("editable");
+
+ $(tdValue).append(valueData);
+ $(tdValue).addClass("editable");
+
+ $(tr).addClass("editRow");
+ $(tr).append(tdName);
+ $(tr).append(tdValue);
+
+ $(tr).insertBefore("#scheduleAddRow");
+ return tr;
+ },
+ handleEditColumn : function(evt) {
+ var curTarget = evt.currentTarget;
+
+ if (this.editingTarget != curTarget) {
+ this.closeEditingTarget();
+
+ var text = $(curTarget).children(".spanValue").text();
+ $(curTarget).empty();
+
+ var input = document.createElement("input");
+ $(input).attr("type", "text");
+ $(input).css("width", "100%");
+ $(input).val(text);
+ $(curTarget).addClass("editing");
+ $(curTarget).append(input);
+ $(input).focus();
+ this.editingTarget = curTarget;
+ }
+ },
+ handleRemoveColumn : function(evt) {
+ var curTarget = evt.currentTarget;
+ // Should be the table
+ var row = curTarget.parentElement.parentElement;
+ $(row).remove();
+ },
+ closeEditingTarget: function(evt) {
+ if (this.editingTarget != null && this.editingTarget != evt.target && this.editingTarget != evt.target.parentElement ) {
+ var input = $(this.editingTarget).children("input")[0];
+ var text = $(input).val();
+ $(input).remove();
+
+ var valueData = document.createElement("span");
+ $(valueData).addClass("spanValue");
+ $(valueData).text(text);
+
+ if ($(this.editingTarget).hasClass("name")) {
+ var icon = document.createElement("span");
+ $(icon).addClass("removeIcon");
+ $(this.editingTarget).append(icon);
+ }
+
+ $(this.editingTarget).removeClass("editing");
+ $(this.editingTarget).append(valueData);
+ this.editingTarget = null;
+ }
+ },
+ handleDisableMenuClick : function(action, el, pos) {
+ var flowData = scheduleFlowData;
+ var jobid = el[0].jobid;
+ var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowName + "&job=" + jobid;
+ if (action == "open") {
+ window.location.href = requestURL;
+ }
+ else if(action == "openwindow") {
+ window.open(requestURL);
+ }
+ else if(action == "disable") {
+ var disabled = flowData.get("disabled");
+
+ disabled[jobid] = true;
+ flowData.set({disabled: disabled});
+ flowData.trigger("change:disabled");
+ }
+ else if(action == "disableAll") {
+ var disabled = flowData.get("disabled");
+
+ var nodes = flowData.get("nodes");
+ for (var key in nodes) {
+ disabled[key] = true;
+ }
+
+ flowData.set({disabled: disabled});
+ flowData.trigger("change:disabled");
+ }
+ else if (action == "disableParents") {
+ var disabled = flowData.get("disabled");
+ var nodes = flowData.get("nodes");
+ var inNodes = nodes[jobid].inNodes;
+
+ if (inNodes) {
+ for (var key in inNodes) {
+ disabled[key] = true;
+ }
+ }
+
+ flowData.set({disabled: disabled});
+ flowData.trigger("change:disabled");
+ }
+ else if (action == "disableChildren") {
+ var disabledMap = flowData.get("disabled");
+ var nodes = flowData.get("nodes");
+ var outNodes = nodes[jobid].outNodes;
+
+ if (outNodes) {
+ for (var key in outNodes) {
+ disabledMap[key] = true;
+ }
+ }
+
+ flowData.set({disabled: disabledMap});
+ flowData.trigger("change:disabled");
+ }
+ else if (action == "disableAncestors") {
+ var disabled = flowData.get("disabled");
+ var nodes = flowData.get("nodes");
+
+ recurseAllAncestors(nodes, disabled, jobid, true);
+
+ flowData.set({disabled: disabled});
+ flowData.trigger("change:disabled");
+ }
+ else if (action == "disableDescendents") {
+ var disabled = flowData.get("disabled");
+ var nodes = flowData.get("nodes");
+
+ recurseAllDescendents(nodes, disabled, jobid, true);
+
+ flowData.set({disabled: disabled});
+ flowData.trigger("change:disabled");
+ }
+ else if(action == "enable") {
+ var disabled = flowData.get("disabled");
+
+ disabled[jobid] = false;
+ flowData.set({disabled: disabled});
+ flowData.trigger("change:disabled");
+ }
+ else if(action == "enableAll") {
+ disabled = {};
+ flowData.set({disabled: disabled});
+ flowData.trigger("change:disabled");
+ }
+ else if (action == "enableParents") {
+ var disabled = flowData.get("disabled");
+ var nodes = flowData.get("nodes");
+ var inNodes = nodes[jobid].inNodes;
+
+ if (inNodes) {
+ for (var key in inNodes) {
+ disabled[key] = false;
+ }
+ }
+
+ flowData.set({disabled: disabled});
+ flowData.trigger("change:disabled");
+ }
+ else if (action == "enableChildren") {
+ var disabled = flowData.get("disabled");
+ var nodes = flowData.get("nodes");
+ var outNodes = nodes[jobid].outNodes;
+
+ if (outNodes) {
+ for (var key in outNodes) {
+ disabled[key] = false;
+ }
+ }
+
+ flowData.set({disabled: disabled});
+ flowData.trigger("change:disabled");
+ }
+ else if (action == "enableAncestors") {
+ var disabled = flowData.get("disabled");
+ var nodes = flowData.get("nodes");
+
+ recurseAllAncestors(nodes, disabled, jobid, false);
+
+ flowData.set({disabled: disabled});
+ flowData.trigger("change:disabled");
+ }
+ else if (action == "enableDescendents") {
+ var disabled = flowData.get("disabled");
+ var nodes = flowData.get("nodes");
+
+ recurseAllDescendents(nodes, disabled, jobid, false);
+
+ flowData.set({disabled: disabled});
+ flowData.trigger("change:disabled");
+ }
+ }
+});
\ No newline at end of file
src/web/js/azkaban.scheduled.view.js 260(+164 -96)
diff --git a/src/web/js/azkaban.scheduled.view.js b/src/web/js/azkaban.scheduled.view.js
index 9b0ee9a..b66374b 100644
--- a/src/web/js/azkaban.scheduled.view.js
+++ b/src/web/js/azkaban.scheduled.view.js
@@ -10,11 +10,11 @@ function removeSched(projectId, flowName) {
function(data) {
if (data.error) {
// alert(data.error)
- $('#errorMsg').text(data.error)
+ $('#errorMsg').text(data.error);
}
else {
// alert("Schedule "+schedId+" removed!")
- window.location = redirectURL
+ window.location = redirectURL;
}
},
"json"
@@ -48,6 +48,7 @@ azkaban.ChangeSlaView = Backbone.View.extend({
"click #remove-sla-btn": "handleRemoveSla",
"click #sla-cancel-btn": "handleSlaCancel",
"click .modal-close": "handleSlaCancel",
+ "click #addRow": "handleAddRow"
},
initialize: function(setting) {
@@ -58,12 +59,31 @@ azkaban.ChangeSlaView = Backbone.View.extend({
$('#slaModalBackground').hide();
$('#sla-options').hide();
+
+ var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
+ var rows = tFlowRules.rows;
+ var rowLength = rows.length
+ for(var i = 0; i < rowLength-1; i++) {
+ tFlowRules.deleteRow(0);
+ }
+
},
initFromSched: function(projId, flowName) {
this.projectId = projId;
this.flowName = flowName;
- this.scheduleURL = contextURL + "/schedule"
- var fetchScheduleData = {"projId": this.projectId, "ajax":"schedInfo", "flowName":this.flowName};
+
+ var scheduleURL = contextURL + "/schedule"
+ this.scheduleURL = scheduleURL;
+ var indexToName = {};
+ var nameToIndex = {};
+ var indexToText = {};
+ this.indexToName = indexToName;
+ this.nameToIndex = nameToIndex;
+ this.indexToText = indexToText;
+ var ruleBoxOptions = ["SUCCESS", "FINISH"];
+ this.ruleBoxOptions = ruleBoxOptions;
+
+ var fetchScheduleData = {"projId": this.projectId, "ajax":"slaInfo", "flowName":this.flowName};
$.get(
this.scheduleURL,
@@ -76,53 +96,84 @@ azkaban.ChangeSlaView = Backbone.View.extend({
if (data.slaEmails) {
$('#slaEmails').val(data.slaEmails.join());
}
- var flowRulesTbl = document.getElementById("flowRulesTbl").tBodies[0];
- var flowRuleRow = flowRulesTbl.insertRow(-1);
- var cflowName = flowRuleRow.insertCell(0);
- cflowName.innerHTML = flowName;
- var cflowduration = flowRuleRow.insertCell(1);
- var flowDuration = document.createElement("input");
- flowDuration.setAttribute("type", "text");
- flowDuration.setAttribute("id", "flowDuration");
- flowDuration.setAttribute("class", "durationpick");
- if(data.flowRules) {
- flowDuration.setAttribute("value", data.flowRules.duration);
+
+ var allJobNames = data.allJobNames;
+
+ indexToName[0] = "";
+ nameToIndex[flowName] = 0;
+ indexToText[0] = "flow " + flowName;
+ for(var i = 1; i <= allJobNames.length; i++) {
+ indexToName[i] = allJobNames[i-1];
+ nameToIndex[allJobNames[i-1]] = i;
+ indexToText[i] = "job " + allJobNames[i-1];
}
- cflowduration.appendChild(flowDuration);
- var emailAct = flowRuleRow.insertCell(2);
- var checkEmailAct = document.createElement("input");
- checkEmailAct.setAttribute("type", "checkbox");
- emailAct.appendChild(checkEmailAct);
- var killAct = flowRuleRow.insertCell(3);
- var checkKillAct = document.createElement("input");
- checkKillAct.setAttribute("type", "checkbox");
- killAct.appendChild(checkKillAct);
- var jobRulesTbl = document.getElementById("jobRulesTbl").tBodies[0];
- var allJobs = data.allJobs;
- for (var job in allJobs) {
+
+
+
+
+ // populate with existing settings
+ if(data.settings) {
- var jobRuleRow = jobRulesTbl.insertRow(-1);
- var cjobName = jobRuleRow.insertCell(0);
- cjobName.innerHTML = allJobs[job];
- var cjobduration = jobRuleRow.insertCell(1);
- var jobDuration = document.createElement("input");
- jobDuration.setAttribute("type", "text");
- jobDuration.setAttribute("id", "jobDuration");
- jobDuration.setAttribute("class", "durationpick");
- if(data.jobRules) {
- jobDuration.setAttribute("value", data.jobRules[job].duration);
- }
- cjobduration.appendChild(jobDuration);
+ var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
- var emailAct = jobRuleRow.insertCell(2);
- var checkEmailAct = document.createElement("input");
- checkEmailAct.setAttribute("type", "checkbox");
- emailAct.appendChild(checkEmailAct);
- var killAct = jobRuleRow.insertCell(3);
- var checkKillAct = document.createElement("input");
- checkKillAct.setAttribute("type", "checkbox");
- killAct.appendChild(checkKillAct);
+ for(var setting in data.settings) {
+ var rFlowRule = tFlowRules.insertRow(0);
+
+ var cId = rFlowRule.insertCell(-1);
+ var idSelect = document.createElement("select");
+ for(var i in indexToName) {
+ idSelect.options[i] = new Option(indexToText[i], indexToName[i]);
+ if(data.settings[setting].id == indexToName[i]) {
+ idSelect.options[i].selected = true;
+ }
+ }
+ cId.appendChild(idSelect);
+
+ var cRule = rFlowRule.insertCell(-1);
+ var ruleSelect = document.createElement("select");
+ for(var i in ruleBoxOptions) {
+ ruleSelect.options[i] = new Option(ruleBoxOptions[i], ruleBoxOptions[i]);
+ if(data.settings[setting].rule == ruleBoxOptions[i]) {
+ ruleSelect.options[i].selected = true;
+ }
+ }
+ cRule.appendChild(ruleSelect);
+
+ var cDuration = rFlowRule.insertCell(-1);
+ var duration = document.createElement("input");
+ duration.type = "text";
+ duration.setAttribute("class", "durationpick");
+ var rawMinutes = data.settings[setting].duration;
+ var intMinutes = rawMinutes.substring(0, rawMinutes.length-1);
+ var minutes = parseInt(intMinutes);
+ var hours = Math.floor(minutes / 60);
+ minutes = minutes % 60;
+ duration.value = hours + ":" + minutes;
+ cDuration.appendChild(duration);
+
+ var cEmail = rFlowRule.insertCell(-1);
+ var emailCheck = document.createElement("input");
+ emailCheck.type = "checkbox";
+ for(var act in data.settings[setting].actions) {
+ if(data.settings[setting].actions[act] == "EMAIL") {
+ emailCheck.checked = true;
+ }
+ }
+ cEmail.appendChild(emailCheck);
+
+ var cKill = rFlowRule.insertCell(-1);
+ var killCheck = document.createElement("input");
+ killCheck.type = "checkbox";
+ for(var act in data.settings[setting].actions) {
+ if(data.settings[setting].actions[act] == "KILL") {
+ killCheck.checked = true;
+ }
+ }
+ cKill.appendChild(killCheck);
+
+ $('.durationpick').timepicker({hourMax: 99});
+ }
}
$('.durationpick').timepicker({hourMax: 99});
}
@@ -139,8 +190,8 @@ azkaban.ChangeSlaView = Backbone.View.extend({
},
handleRemoveSla: function(evt) {
console.log("Clicked remove sla button");
- var scheduleURL = contextURL + "/schedule"
- var redirectURL = contextURL + "/schedule"
+ var scheduleURL = this.scheduleURL;
+ var redirectURL = this.scheduleURL;
$.post(
scheduleURL,
{"action":"removeSla", "projectId":this.projectId, "flowName":this.flowName},
@@ -159,55 +210,90 @@ azkaban.ChangeSlaView = Backbone.View.extend({
handleSetSla: function(evt) {
var slaEmails = $('#slaEmails').val();
-
-// var flowRules = {};
- var flowRulesTbl = document.getElementById("flowRulesTbl").tBodies[0];
- var flowRuleRow = flowRulesTbl.rows[0];
-// flowRules["flowDuration"] = flowRuleRow.cells[1].firstChild.value;
-// flowRules["flowEmailAction"] = flowRuleRow.cells[2].firstChild.value;
-// flowRules["flowKillAction"] = flowRuleRow.cells[3].firstChild.value;
- var flowRules = flowRuleRow.cells[1].firstChild.value + ',' + flowRuleRow.cells[2].firstChild.value + ',' + flowRuleRow.cells[3].firstChild.value;
+ var settings = {};
- var jobRules = {};
- var jobRulesTbl = document.getElementById("jobRulesTbl").tBodies[0];
- console.log(jobRulesTbl.rows.length);
- for(var row = 0; row < jobRulesTbl.rows.length; row++) {
-
- var jobRow = jobRulesTbl.rows[row];
- var jobRule = {};
-
- console.log(row);
- console.log(jobRow.cells[0].firstChild.value);
-// jobRule["jobDuration"] = jobRow.cells[1].firstChild.value;
-// jobRule["jobEmailAction"] = jobRow.cells[2].firstChild.value;
-// jobRule["jobKillAction"] = jobRow.cells[3].firstChild.value;
-// jobRules[jobRow.cells[0].innerHTML] = jobRule;
- jobRules[jobRow.cells[0].innerHTML] = jobRow.cells[1].firstChild.value + ',' + jobRow.cells[2].firstChild.value + ',' + jobRow.cells[3].firstChild.value;
- }
+ var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
+ for(var row = 0; row < tFlowRules.rows.length-1; row++) {
+ var rFlowRule = tFlowRules.rows[row];
+ var id = rFlowRule.cells[0].firstChild.value;
+ var rule = rFlowRule.cells[1].firstChild.value;
+ var duration = rFlowRule.cells[2].firstChild.value;
+ var email = rFlowRule.cells[3].firstChild.value;
+ var kill = rFlowRule.cells[4].firstChild.value;
+ settings[row] = id + "," + rule + "," + duration + "," + email + "," + kill;
+ }
+
var slaData = {
projectId: this.projectId,
flowName: this.flowName,
ajax: "setSla",
slaEmails: slaEmails,
- flowRules: flowRules,
- jobRules: jobRules
+ settings: settings
};
- $.get(
- this.scheduleURL,
+ var scheduleURL = this.scheduleURL;
+
+ $.post(
+ scheduleURL,
slaData,
function(data) {
if (data.error) {
alert(data.error);
}
else {
- window.location.href = this.scheduleURL;
+ tFlowRules.length = 0;
+ window.location = scheduleURL;
}
},
"json"
);
},
+ handleAddRow: function(evt) {
+
+ var indexToName = this.indexToName;
+ var nameToIndex = this.nameToIndex;
+ var indexToText = this.indexToText;
+ var ruleBoxOptions = this.ruleBoxOptions;
+
+ var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
+ var rFlowRule = tFlowRules.insertRow(0);
+
+ var cId = rFlowRule.insertCell(-1);
+ var idSelect = document.createElement("select");
+ for(var i in indexToName) {
+ idSelect.options[i] = new Option(indexToText[i], indexToName[i]);
+ }
+
+ cId.appendChild(idSelect);
+
+ var cRule = rFlowRule.insertCell(-1);
+ var ruleSelect = document.createElement("select");
+ for(var i in ruleBoxOptions) {
+ ruleSelect.options[i] = new Option(ruleBoxOptions[i], ruleBoxOptions[i]);
+ }
+ cRule.appendChild(ruleSelect);
+
+ var cDuration = rFlowRule.insertCell(-1);
+ var duration = document.createElement("input");
+ duration.type = "text";
+ duration.setAttribute("class", "durationpick");
+ cDuration.appendChild(duration);
+
+ var cEmail = rFlowRule.insertCell(-1);
+ var emailCheck = document.createElement("input");
+ emailCheck.type = "checkbox";
+ cEmail.appendChild(emailCheck);
+
+ var cKill = rFlowRule.insertCell(-1);
+ var killCheck = document.createElement("input");
+ killCheck.type = "checkbox";
+ cKill.appendChild(killCheck);
+
+ $('.durationpick').timepicker({hourMax: 99});
+
+ return rFlowRule;
+ },
handleEditColumn : function(evt) {
var curTarget = evt.currentTarget;
@@ -234,25 +320,7 @@ azkaban.ChangeSlaView = Backbone.View.extend({
$(row).remove();
},
closeEditingTarget: function(evt) {
- if (this.editingTarget != null && this.editingTarget != evt.target && this.editingTarget != evt.target.parentElement ) {
- var input = $(this.editingTarget).children("input")[0];
- var text = $(input).val();
- $(input).remove();
-
- var valueData = document.createElement("span");
- $(valueData).addClass("spanValue");
- $(valueData).text(text);
- if ($(this.editingTarget).hasClass("name")) {
- var icon = document.createElement("span");
- $(icon).addClass("removeIcon");
- $(this.editingTarget).append(icon);
- }
-
- $(this.editingTarget).removeClass("editing");
- $(this.editingTarget).append(valueData);
- this.editingTarget = null;
- }
}
});
unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java 164(+106 -58)
diff --git a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
index f6d73f2..4a9491c 100644
--- a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
+++ b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
@@ -4,6 +4,7 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -15,11 +16,23 @@ import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.joda.time.DateTimeZone;
+import org.joda.time.DurationFieldType;
+import org.joda.time.Hours;
+import org.joda.time.MutablePeriod;
+import org.joda.time.Period;
+import org.joda.time.PeriodType;
+import org.joda.time.ReadablePeriod;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.scheduler.Schedule.FlowOptions;
+import azkaban.scheduler.Schedule.SlaOptions;
+import azkaban.sla.SLA.SlaAction;
+import azkaban.sla.SLA.SlaRule;
+import azkaban.sla.SLA.SlaSetting;
import azkaban.utils.DataSourceUtils;
import azkaban.utils.Props;
@@ -103,26 +116,34 @@ public class JdbcScheduleLoaderTest {
JdbcScheduleLoader loader = createLoader();
- Map<String, Object> scheduleOptions = new HashMap<String, Object>();
- List<String> disabled = new ArrayList<String>();
- disabled.add("job1");
- disabled.add("job2");
- disabled.add("job3");
- List<String> failEmails = new ArrayList<String>();
- failEmails.add("email1");
- failEmails.add("email2");
- failEmails.add("email3");
- boolean hasSla = true;
- scheduleOptions.put("disabled", disabled);
- scheduleOptions.put("failEmails", failEmails);
- scheduleOptions.put("hasSla", hasSla);
+ List<String> emails = new ArrayList<String>();
+ emails.add("email1");
+ emails.add("email2");
+ List<String> disabledJobs = new ArrayList<String>();
+ disabledJobs.add("job1");
+ disabledJobs.add("job2");
+ List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
+ SlaSetting set1 = new SlaSetting();
+ List<SlaAction> actions = new ArrayList<SlaAction>();
+ actions.add(SlaAction.EMAIL);
+ set1.setActions(actions);
+ set1.setId("");
+ set1.setDuration(Schedule.parsePeriodString("1h"));
+ set1.setRule(SlaRule.FINISH);
+ slaSets.add(set1);
+ FlowOptions flowOptions = new FlowOptions();
+ flowOptions.setFailureEmails(emails);
+ flowOptions.setDisabledJobs(disabledJobs);
+ SlaOptions slaOptions = new SlaOptions();
+ slaOptions.setSlaEmails(emails);
+ slaOptions.setSettings(slaSets);
- Schedule s1 = new Schedule(1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", scheduleOptions);
- Schedule s2 = new Schedule(1, "proj1", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "ccc", scheduleOptions);
- Schedule s3 = new Schedule(2, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", scheduleOptions);
- Schedule s4 = new Schedule(3, "proj2", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", scheduleOptions);
- Schedule s5 = new Schedule(3, "proj2", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", scheduleOptions);
- Schedule s6 = new Schedule(3, "proj2", "flow3", "error", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", scheduleOptions);
+ Schedule s1 = new Schedule(1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+ Schedule s2 = new Schedule(1, "proj1", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "ccc", flowOptions, slaOptions);
+ Schedule s3 = new Schedule(2, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+ Schedule s4 = new Schedule(3, "proj2", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+ Schedule s5 = new Schedule(3, "proj2", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+ Schedule s6 = new Schedule(3, "proj2", "flow3", "error", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
loader.insertSchedule(s1);
loader.insertSchedule(s2);
@@ -132,13 +153,23 @@ public class JdbcScheduleLoaderTest {
loader.insertSchedule(s6);
List<Schedule> schedules = loader.loadSchedules();
+ Schedule sched = schedules.get(0);
Assert.assertEquals(6, schedules.size());
- Assert.assertEquals("America/Los_Angeles", schedules.get(0).getTimezone().getID());
- Assert.assertEquals(44444, schedules.get(0).getSubmitTime());
- Assert.assertEquals("1d", Schedule.createPeriodString(schedules.get(0).getPeriod()));
- System.out.println("the options are " + schedules.get(0).getSchedOptions());
- Assert.assertEquals(true, schedules.get(0).getSchedOptions().get("hasSla"));
+ Assert.assertEquals("America/Los_Angeles", sched.getTimezone().getID());
+ Assert.assertEquals(44444, sched.getSubmitTime());
+ Assert.assertEquals("1d", Schedule.createPeriodString(sched.getPeriod()));
+ FlowOptions fOpt = sched.getFlowOptions();
+ SlaOptions sOpt = sched.getSlaOptions();
+ Assert.assertEquals(SlaAction.EMAIL, sOpt.getSettings().get(0).getActions().get(0));
+ Assert.assertEquals("", sOpt.getSettings().get(0).getId());
+ Assert.assertEquals(Schedule.parsePeriodString("1h"), sOpt.getSettings().get(0).getDuration());
+ Assert.assertEquals(SlaRule.FINISH, sOpt.getSettings().get(0).getRule());
+ Assert.assertEquals(2, fOpt.getFailureEmails().size());
+ Assert.assertEquals(null, fOpt.getSuccessEmails());
+ Assert.assertEquals(2, fOpt.getDisabledJobs().size());
+ Assert.assertEquals(FailureAction.FINISH_CURRENTLY_RUNNING, fOpt.getFailureAction());
+ Assert.assertEquals(null, fOpt.getFlowOverride());
}
@Test
@@ -150,29 +181,38 @@ public class JdbcScheduleLoaderTest {
JdbcScheduleLoader loader = createLoader();
- Map<String, Object> scheduleOptions = new HashMap<String, Object>();
- List<String> disabled = new ArrayList<String>();
- disabled.add("job1");
- disabled.add("job2");
- disabled.add("job3");
- List<String> failEmails = new ArrayList<String>();
- failEmails.add("email1");
- failEmails.add("email2");
- failEmails.add("email3");
- boolean hasSla = true;
- scheduleOptions.put("disabled", disabled);
- scheduleOptions.put("failEmails", failEmails);
- scheduleOptions.put("hasSla", hasSla);
+ List<String> emails = new ArrayList<String>();
+ emails.add("email1");
+ emails.add("email2");
+ List<String> disabledJobs = new ArrayList<String>();
+ disabledJobs.add("job1");
+ disabledJobs.add("job2");
+ List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
+ SlaSetting set1 = new SlaSetting();
+ List<SlaAction> actions = new ArrayList<SlaAction>();
+ actions.add(SlaAction.EMAIL);
+ set1.setActions(actions);
+ set1.setId("");
+ set1.setDuration(Schedule.parsePeriodString("1h"));
+ set1.setRule(SlaRule.FINISH);
+ slaSets.add(set1);
+ FlowOptions flowOptions = new FlowOptions();
+ flowOptions.setFailureEmails(emails);
+ flowOptions.setDisabledJobs(disabledJobs);
+ SlaOptions slaOptions = new SlaOptions();
+ slaOptions.setSlaEmails(emails);
+ slaOptions.setSettings(slaSets);
- System.out.println("the options are " + scheduleOptions);
- Schedule s1 = new Schedule(1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", scheduleOptions);
+ System.out.println("the flow options are " + flowOptions);
+ System.out.println("the sla options are " + slaOptions);
+ Schedule s1 = new Schedule(1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
loader.insertSchedule(s1);
- hasSla = false;
- scheduleOptions.put("hasSla", hasSla);
+ emails.add("email3");
+ slaOptions.setSlaEmails(emails);
- Schedule s2 = new Schedule(1, "proj1", "flow1", "ready", 11112, "America/Los_Angeles", "2M", 22223, 33334, 44445, "cyu", scheduleOptions);
+ Schedule s2 = new Schedule(1, "proj1", "flow1", "ready", 11112, "America/Los_Angeles", "2M", 22223, 33334, 44445, "cyu", flowOptions, slaOptions);
loader.updateSchedule(s2);
@@ -182,8 +222,8 @@ public class JdbcScheduleLoaderTest {
Assert.assertEquals("America/Los_Angeles", schedules.get(0).getTimezone().getID());
Assert.assertEquals(44445, schedules.get(0).getSubmitTime());
Assert.assertEquals("2M", Schedule.createPeriodString(schedules.get(0).getPeriod()));
- System.out.println("the options are " + schedules.get(0).getSchedOptions());
- Assert.assertEquals(false, schedules.get(0).getSchedOptions().get("hasSla"));
+// System.out.println("the options are " + schedules.get(0).getSchedOptions());
+ Assert.assertEquals(3, schedules.get(0).getSlaOptions().getSlaEmails().size());
}
@Test
@@ -202,21 +242,29 @@ public class JdbcScheduleLoaderTest {
for(int i=0; i<stress; i++)
{
- Map<String, Object> scheduleOptions = new HashMap<String, Object>();
- List<String> disabled = new ArrayList<String>();
- disabled.add("job1");
- disabled.add("job2");
- disabled.add("job3");
- List<String> failEmails = new ArrayList<String>();
- failEmails.add("email1");
- failEmails.add("email2");
- failEmails.add("email3");
- boolean hasSla = true;
- scheduleOptions.put("disabled", disabled);
- scheduleOptions.put("failEmails", failEmails);
- scheduleOptions.put("hasSla", hasSla);
+ List<String> emails = new ArrayList<String>();
+ emails.add("email1");
+ emails.add("email2");
+ List<String> disabledJobs = new ArrayList<String>();
+ disabledJobs.add("job1");
+ disabledJobs.add("job2");
+ List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
+ SlaSetting set1 = new SlaSetting();
+ List<SlaAction> actions = new ArrayList<SlaAction>();
+ actions.add(SlaAction.EMAIL);
+ set1.setActions(actions);
+ set1.setId("");
+ set1.setDuration(Schedule.parsePeriodString("1h"));
+ set1.setRule(SlaRule.FINISH);
+ slaSets.add(set1);
+ FlowOptions flowOptions = new FlowOptions();
+ flowOptions.setFailureEmails(emails);
+ flowOptions.setDisabledJobs(disabledJobs);
+ SlaOptions slaOptions = new SlaOptions();
+ slaOptions.setSlaEmails(emails);
+ slaOptions.setSettings(slaSets);
- Schedule s = new Schedule(i+1, "proj"+(i+1), "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", scheduleOptions);
+ Schedule s = new Schedule(i+1, "proj"+(i+1), "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
schedules.add(s);
try {
loader.insertSchedule(s);
unit/java/azkaban/test/sla/JdbcSLALoaderTest.java 182(+182 -0)
diff --git a/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java b/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java
new file mode 100644
index 0000000..02b7855
--- /dev/null
+++ b/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java
@@ -0,0 +1,182 @@
+package azkaban.test.sla;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.joda.time.DateTime;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import azkaban.sla.JdbcSLALoader;
+import azkaban.sla.SLA;
+import azkaban.sla.SLA.SlaAction;
+import azkaban.sla.SLA.SlaRule;
+import azkaban.sla.SLALoader;
+import azkaban.utils.DataSourceUtils;
+import azkaban.utils.Props;
+
+
+
+public class JdbcSLALoaderTest {
+ private static boolean testDBExists;
+ //@TODO remove this and turn into local host.
+ private static final String host = "localhost";
+ private static final int port = 3306;
+ private static final String database = "azkaban2";
+ private static final String user = "azkaban";
+ private static final String password = "azkaban";
+ private static final int numConnections = 10;
+
+
+ @BeforeClass
+ public static void setupDB() {
+ DataSource dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
+ testDBExists = true;
+
+ Connection connection = null;
+ try {
+ connection = dataSource.getConnection();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ testDBExists = false;
+ DbUtils.closeQuietly(connection);
+ return;
+ }
+
+ CountHandler countHandler = new CountHandler();
+ QueryRunner runner = new QueryRunner();
+ try {
+ runner.query(connection, "SELECT COUNT(1) FROM active_sla", countHandler);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ testDBExists = false;
+ DbUtils.closeQuietly(connection);
+ return;
+ }
+
+ DbUtils.closeQuietly(connection);
+
+ clearDB();
+ }
+
+ @AfterClass
+ public static void clearDB() {
+ if (!testDBExists) {
+ return;
+ }
+
+ DataSource dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
+ Connection connection = null;
+ try {
+ connection = dataSource.getConnection();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ testDBExists = false;
+ DbUtils.closeQuietly(connection);
+ return;
+ }
+
+ QueryRunner runner = new QueryRunner();
+ try {
+ runner.update(connection, "DELETE FROM active_sla");
+
+ } catch (SQLException e) {
+ e.printStackTrace();
+ testDBExists = false;
+ DbUtils.closeQuietly(connection);
+ return;
+ }
+
+ DbUtils.closeQuietly(connection);
+ }
+
+ @Test
+ public void testInsertSLA() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+
+ SLALoader loader = createLoader();
+
+ int execId = 1;
+ String jobName = "";
+ DateTime checkTime = new DateTime(11111);
+ List<String> emails = new ArrayList<String>();
+ emails.add("email1");
+ emails.add("email2");
+// List<SlaRule> rules = new ArrayList<SlaRule>();
+// rules.add(SlaRule.SUCCESS);
+// rules.add(SlaRule.FINISH);
+ List<SlaAction> actions = new ArrayList<SLA.SlaAction>();
+ actions.add(SlaAction.EMAIL);
+ SLA s = new SLA(execId, jobName, checkTime, emails, actions, null, SlaRule.FINISH);
+
+ loader.insertSLA(s);
+
+ List<SLA> allSLAs = loader.loadSLAs();
+ SLA fetchSLA = allSLAs.get(0);
+
+ Assert.assertTrue(allSLAs.size() == 1);
+ // Shouldn't be the same object.
+ Assert.assertTrue(s != fetchSLA);
+ Assert.assertEquals(s.getExecId(), fetchSLA.getExecId());
+ Assert.assertEquals(s.getJobName(), fetchSLA.getJobName());
+ Assert.assertEquals(s.getCheckTime(), fetchSLA.getCheckTime());
+ Assert.assertEquals(s.getEmails(), fetchSLA.getEmails());
+ Assert.assertEquals(s.getRule(), fetchSLA.getRule());
+ Assert.assertEquals(s.getActions().get(0), fetchSLA.getActions().get(0));
+
+
+ loader.removeSLA(s);
+
+ allSLAs = loader.loadSLAs();
+
+ Assert.assertTrue(allSLAs.size() == 0);
+ }
+
+ private SLALoader createLoader() {
+ Props props = new Props();
+ props.put("database.type", "mysql");
+
+ props.put("mysql.host", host);
+ props.put("mysql.port", port);
+ props.put("mysql.user", user);
+ props.put("mysql.database", database);
+ props.put("mysql.password", password);
+ props.put("mysql.numconnections", numConnections);
+
+ return new JdbcSLALoader(props);
+ }
+
+ private boolean isTestSetup() {
+ if (!testDBExists) {
+ System.err.println("Skipping DB test because Db not setup.");
+ return false;
+ }
+
+ System.out.println("Running DB test because Db setup.");
+ return true;
+ }
+
+ public static class CountHandler implements ResultSetHandler<Integer> {
+ @Override
+ public Integer handle(ResultSet rs) throws SQLException {
+ int val = 0;
+ while (rs.next()) {
+ val++;
+ }
+
+ return val;
+ }
+ }
+}
\ No newline at end of file
diff --git a/unit/java/azkaban/test/utils/EmailMessageTest.java b/unit/java/azkaban/test/utils/EmailMessageTest.java
new file mode 100644
index 0000000..43bf216
--- /dev/null
+++ b/unit/java/azkaban/test/utils/EmailMessageTest.java
@@ -0,0 +1,46 @@
+package azkaban.test.utils;
+
+import java.io.IOException;
+
+import javax.mail.MessagingException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.utils.EmailMessage;
+
+public class EmailMessageTest {
+
+ String host = "";
+ String sender = "";
+ String user = "";
+ String password = "";
+
+ String toAddr = "";
+
+ private EmailMessage em;
+ @Before
+ public void setUp() throws Exception {
+ em = new EmailMessage(host, user, password);
+ em.setFromAddress(sender);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testSendEmail() throws IOException {
+ em.addToAddress(toAddr);
+ //em.addToAddress("cyu@linkedin.com");
+ em.setSubject("azkaban test email");
+ em.setBody("azkaban test email");
+ try {
+ em.sendEmail();
+ } catch (MessagingException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+}