azkaban-uncached
Changes
src/java/azkaban/scheduler/JdbcScheduleLoader.java 219(+135 -84)
src/java/azkaban/scheduler/Schedule.java 416(+323 -93)
src/java/azkaban/scheduler/ScheduleManager.java 83(+66 -17)
src/java/azkaban/sla/JdbcSLALoader.java 330(+330 -0)
src/java/azkaban/sla/SLA.java 252(+252 -0)
src/java/azkaban/sla/SLALoader.java 14(+14 -0)
src/java/azkaban/sla/SlaMailer.java 228(+228 -0)
src/java/azkaban/sla/SLAManager.java 467(+467 -0)
src/java/azkaban/utils/EmailMessage.java 20(+13 -7)
src/java/azkaban/webapp/servlet/ScheduleServlet.java 454(+402 -52)
src/sql/create_schedule_table.sql 2(+2 -0)
src/sql/create_sla_table.sql 10(+10 -0)
src/web/css/azkaban.css 292(+277 -15)
src/web/js/azkaban.flow.view.js 110(+19 -91)
src/web/js/azkaban.schedule.options.view.js 586(+586 -0)
src/web/js/azkaban.scheduled.view.js 353(+338 -15)
unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java 181(+136 -45)
unit/java/azkaban/test/sla/JdbcSLALoaderTest.java 182(+182 -0)
Details
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index a5add77..35208cb 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -631,7 +631,7 @@ public class ExecutorManager {
return flow;
}
- private boolean isFinished(ExecutableFlow flow) {
+ public boolean isFinished(ExecutableFlow flow) {
switch(flow.getStatus()) {
case SUCCEEDED:
case FAILED:
src/java/azkaban/scheduler/JdbcScheduleLoader.java 219(+135 -84)
diff --git a/src/java/azkaban/scheduler/JdbcScheduleLoader.java b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
index caba099..ee9c812 100644
--- a/src/java/azkaban/scheduler/JdbcScheduleLoader.java
+++ b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
@@ -17,12 +17,14 @@
package azkaban.scheduler;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import javax.sql.DataSource;
@@ -36,58 +38,86 @@ import org.joda.time.ReadablePeriod;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
+import azkaban.scheduler.Schedule.FlowOptions;
+import azkaban.scheduler.Schedule.SlaOptions;
+import azkaban.sla.SLA;
+import azkaban.sla.SLAManagerException;
+import azkaban.sla.JdbcSLALoader.EncodingType;
import azkaban.utils.DataSourceUtils;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
public class JdbcScheduleLoader implements ScheduleLoader {
+
+ private static Logger logger = Logger.getLogger(JdbcScheduleLoader.class);
- private DataSource dataSource;
- private static DateTimeFormatter FILE_DATEFORMAT = DateTimeFormat.forPattern("yyyy-MM-dd.HH.mm.ss.SSS");
- private static Logger logger = Logger.getLogger(JdbcScheduleLoader.class);
+ public static enum EncodingType {
+ PLAIN(1), GZIP(2);
+
+ private int numVal;
+
+ EncodingType(int numVal) {
+ this.numVal = numVal;
+ }
+
+ public int getNumVal() {
+ return numVal;
+ }
+
+ public static EncodingType fromInteger(int x) {
+ switch (x) {
+ case 1:
+ return PLAIN;
+ case 2:
+ return GZIP;
+ default:
+ return PLAIN;
+ }
+ }
+ }
- private static final String SCHEDULE = "schedule";
- // schedule ids
-// private static final String PROJECTGUID = "projectGuid";
-// private static final String FLOWGUID = "flowGuid";
-//
-// private static final String SCHEDULEID = "scheduleId";
- private static final String PROJECTID = "projectId";
- private static final String PROJECTNAME = "projectName";
- private static final String FLOWNAME = "flowName";
- // status
- private static final String STATUS = "status";
- // schedule info
- private static final String FIRSTSCHEDTIME = "firstSchedTime";
- private static final String TIMEZONE = "timezone";
- private static final String PERIOD = "period";
- private static final String LASTMODIFYTIME = "lastModifyTime";
- private static final String NEXTEXECTIME = "nextExecTime";
- // auditing info
- private static final String SUBMITTIME = "submitTime";
- private static final String SUBMITUSER = "userSubmit";
+ private DataSource dataSource;
+ private EncodingType defaultEncodingType = EncodingType.GZIP;
private static final String scheduleTableName = "schedules";
- private static String SELECT_SCHEDULE_BY_KEY =
- "SELECT project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user FROM " + scheduleTableName + " WHERE project_id=? AND flow_name=?";
-
private static String SELECT_ALL_SCHEDULES =
- "SELECT project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user FROM " + scheduleTableName;
+ "SELECT project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user, enc_type, schedule_options FROM " + scheduleTableName;
private static String INSERT_SCHEDULE =
- "INSERT INTO " + scheduleTableName + " ( project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user) values (?,?,?,?,?,?,?,?,?,?,?)";
+ "INSERT INTO " + scheduleTableName + " ( project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user, enc_type, schedule_options) values (?,?,?,?,?,?,?,?,?,?,?,?,?)";
private static String REMOVE_SCHEDULE_BY_KEY =
"DELETE FROM " + scheduleTableName + " WHERE project_id=? AND flow_name=?";
private static String UPDATE_SCHEDULE_BY_KEY =
- "UPDATE " + scheduleTableName + " SET status=?, first_sched_time=?, timezone=?, period=?, last_modify_time=?, next_exec_time=?, submit_time=?, submit_user=? WHERE project_id=? AND flow_name=?";
+ "UPDATE " + scheduleTableName + " SET status=?, first_sched_time=?, timezone=?, period=?, last_modify_time=?, next_exec_time=?, submit_time=?, submit_user=?, enc_type=?, schedule_options=? WHERE project_id=? AND flow_name=?";
private static String UPDATE_NEXT_EXEC_TIME =
"UPDATE " + scheduleTableName + " SET next_exec_time=? WHERE project_id=? AND flow_name=?";
+ private Connection getConnection() throws ScheduleManagerException {
+ Connection connection = null;
+ try {
+ connection = dataSource.getConnection();
+ } catch (Exception e) {
+ DbUtils.closeQuietly(connection);
+ throw new ScheduleManagerException("Error getting DB connection.", e);
+ }
+
+ return connection;
+ }
+
+ public EncodingType getDefaultEncodingType() {
+ return defaultEncodingType;
+ }
+ public void setDefaultEncodingType(EncodingType defaultEncodingType) {
+ this.defaultEncodingType = defaultEncodingType;
+ }
+
public JdbcScheduleLoader(Props props) {
String databaseType = props.getString("database.type");
@@ -106,15 +136,7 @@ public class JdbcScheduleLoader implements ScheduleLoader {
@Override
public List<Schedule> loadSchedules() throws ScheduleManagerException {
logger.info("Loading all schedules from db.");
- Connection connection = null;
- try {
- connection = dataSource.getConnection();
- } catch (SQLException e1) {
- logger.error("Failed to get db connection!");
- e1.printStackTrace();
- DbUtils.closeQuietly(connection);
- throw new ScheduleManagerException("Failed to get db connection!", e1);
- }
+ Connection connection = getConnection();
QueryRunner runner = new QueryRunner();
ResultSetHandler<List<Schedule>> handler = new ScheduleResultHandler();
@@ -139,7 +161,7 @@ public class JdbcScheduleLoader implements ScheduleLoader {
}
else {
try {
- updateNextExecTime(sched, connection);
+ updateNextExecTime(sched);
} catch (Exception e) {
DbUtils.closeQuietly(connection);
throw new ScheduleManagerException("Update next execution time failed.", e);
@@ -172,10 +194,29 @@ public class JdbcScheduleLoader implements ScheduleLoader {
}
}
- @Override
+
public void insertSchedule(Schedule s) throws ScheduleManagerException {
logger.info("Inserting schedule " + s.getScheduleName() + " into db.");
+ insertSchedule(s, defaultEncodingType);
+ }
+ public void insertSchedule(Schedule s, EncodingType encType) throws ScheduleManagerException {
+
+ String json = JSONUtils.toJSON(s.optionsToObject());
+ byte[] data = null;
+ try {
+ byte[] stringData = json.getBytes("UTF-8");
+ data = stringData;
+
+ if (encType == EncodingType.GZIP) {
+ data = GZIPUtils.gzipBytes(stringData);
+ }
+ logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+ }
+ catch (IOException e) {
+ throw new ScheduleManagerException("Error encoding the schedule options. " + s.getScheduleName());
+ }
+
QueryRunner runner = new QueryRunner(dataSource);
try {
int inserts = runner.update(
@@ -190,7 +231,9 @@ public class JdbcScheduleLoader implements ScheduleLoader {
s.getLastModifyTime(),
s.getNextExecTime(),
s.getSubmitTime(),
- s.getSubmitUser());
+ s.getSubmitUser(),
+ encType.getNumVal(),
+ data);
if (inserts == 0) {
throw new ScheduleManagerException("No schedule has been inserted.");
}
@@ -200,8 +243,10 @@ public class JdbcScheduleLoader implements ScheduleLoader {
}
}
- private void updateNextExecTime(Schedule s, Connection connection) throws ScheduleManagerException
+ @Override
+ public void updateNextExecTime(Schedule s) throws ScheduleManagerException
{
+ Connection connection = getConnection();
QueryRunner runner = new QueryRunner();
try {
runner.update(connection, UPDATE_NEXT_EXEC_TIME, s.getNextExecTime(), s.getProjectId(), s.getFlowName());
@@ -214,6 +259,25 @@ public class JdbcScheduleLoader implements ScheduleLoader {
@Override
public void updateSchedule(Schedule s) throws ScheduleManagerException {
logger.info("Updating schedule " + s.getScheduleName() + " into db.");
+ updateSchedule(s, defaultEncodingType);
+ }
+
+ public void updateSchedule(Schedule s, EncodingType encType) throws ScheduleManagerException {
+
+ String json = JSONUtils.toJSON(s.optionsToObject());
+ byte[] data = null;
+ try {
+ byte[] stringData = json.getBytes("UTF-8");
+ data = stringData;
+
+ if (encType == EncodingType.GZIP) {
+ data = GZIPUtils.gzipBytes(stringData);
+ }
+ logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+ }
+ catch (IOException e) {
+ throw new ScheduleManagerException("Error encoding the schedule options " + s.getScheduleName());
+ }
QueryRunner runner = new QueryRunner(dataSource);
@@ -227,7 +291,9 @@ public class JdbcScheduleLoader implements ScheduleLoader {
s.getLastModifyTime(),
s.getNextExecTime(),
s.getSubmitTime(),
- s.getSubmitUser(),
+ s.getSubmitUser(),
+ encType.getNumVal(),
+ data,
s.getProjectId(),
s.getFlowName());
if (updates == 0) {
@@ -238,46 +304,6 @@ public class JdbcScheduleLoader implements ScheduleLoader {
throw new ScheduleManagerException("Update schedule " + s.getScheduleName() + " into db failed. ", e);
}
}
-
-// private Schedule fromJSON(HashMap<String, Object> obj) {
-// long projectGuid = Long.valueOf((String) obj.get(PROJECTGUID));
-// String projectId = (String) obj.get(PROJECTID);
-// long flowGuid = Long.valueOf((String) obj.get(FLOWGUID));
-// String flowId = (String) obj.get(FLOWID);
-// String status = (String) obj.get(STATUS);
-// long firstSchedTime = Long.valueOf((String) obj.get(FIRSTSCHEDTIME));
-// String timezone = (String) obj.get(TIMEZONE);
-// String period = (String) obj.get(PERIOD);
-// long lastModifyTime = Long.valueOf((String) obj.get(LASTMODIFYTIME));
-// long nextExecTime = Long.valueOf((String) obj.get(NEXTEXECTIME));
-// long submitTime = Long.valueOf((String) obj.get(SUBMITTIME));
-// String submitUser = (String) obj.get(SUBMITUSER);
-//
-// return new Schedule(projectId, flowId, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser);
-//
-// }
-//
-// private HashMap<String, Object> toJSON(Schedule s) {
-// HashMap<String, Object> object = new HashMap<String, Object>();
-//// object.put(PROJECTGUID, s.getProjectGuid());
-// object.put(SCHEDULEID, s.getScheduleId());
-// object.put(PROJECTID, s.getProjectId());
-//// object.put(FLOWGUID, s.getFlowGuid());
-// object.put(FLOWID, s.getFlowId());
-//
-// object.put(STATUS, s.getStatus());
-//
-// object.put(FIRSTSCHEDTIME, s.getFirstSchedTime());
-// object.put(TIMEZONE, s.getTimezone());
-// object.put(PERIOD, s.getPeriod());
-//
-// object.put(LASTMODIFYTIME, s.getLastModifyTime());
-// object.put(NEXTEXECTIME, s.getNextExecTime());
-// object.put(SUBMITTIME, s.getSubmitTime());
-// object.put(SUBMITUSER, s.getSubmitUser());
-//
-// return object;
-// }
public class ScheduleResultHandler implements ResultSetHandler<List<Schedule>> {
@Override
@@ -299,8 +325,33 @@ public class JdbcScheduleLoader implements ScheduleLoader {
long nextExecTime = rs.getLong(9);
long submitTime = rs.getLong(10);
String submitUser = rs.getString(11);
+ int encodingType = rs.getInt(12);
+ byte[] data = rs.getBytes(13);
+
+ FlowOptions flowOptions = null;
+ SlaOptions slaOptions = null;
+ if (data != null) {
+ EncodingType encType = EncodingType.fromInteger(encodingType);
+ Object optsObj;
+ try {
+ // Convoluted way to inflate strings. Should find common package or helper function.
+ if (encType == EncodingType.GZIP) {
+ // Decompress the sucker.
+ String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+ optsObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+ else {
+ String jsonString = new String(data, "UTF-8");
+ optsObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+ flowOptions = Schedule.createFlowOptionFromObject(optsObj);
+ slaOptions = Schedule.createSlaOptionFromObject(optsObj);
+ } catch (IOException e) {
+ throw new SQLException("Error reconstructing schedule options " + projectName + "." + flowName);
+ }
+ }
- Schedule s = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser);
+ Schedule s = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, flowOptions, slaOptions);
schedules.add(s);
} while (rs.next());
src/java/azkaban/scheduler/Schedule.java 416(+323 -93)
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index a8c0c91..1c80c58 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -16,7 +16,10 @@
package azkaban.scheduler;
-
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@@ -29,11 +32,153 @@ import org.joda.time.ReadablePeriod;
import org.joda.time.Seconds;
import org.joda.time.Weeks;
+import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.scheduler.Schedule.FlowOptions;
+import azkaban.scheduler.Schedule.SlaOptions;
+import azkaban.sla.SLA.SlaSetting;
import azkaban.utils.Pair;
public class Schedule{
+
+ public static class FlowOptions {
+
+ public List<String> getFailureEmails() {
+ return failureEmails;
+ }
+ public void setFailureEmails(List<String> failureEmails) {
+ this.failureEmails = failureEmails;
+ }
+ public List<String> getSuccessEmails() {
+ return successEmails;
+ }
+ public void setSuccessEmails(List<String> successEmails) {
+ this.successEmails = successEmails;
+ }
+ public FailureAction getFailureAction() {
+ return failureAction;
+ }
+ public void setFailureAction(FailureAction failureAction) {
+ this.failureAction = failureAction;
+ }
+ public boolean isnotifyOnFirstFailure() {
+ return notifyOnFirstFailure;
+ }
+ public void setNotifyOnFirstFailure(boolean notifyOnFirstFailure) {
+ this.notifyOnFirstFailure = notifyOnFirstFailure;
+ }
+ public boolean isnotifyOnLastFailure() {
+ return notifyOnLastFailure;
+ }
+ public void setNotifyOnLastFailure(boolean notifyOnLastFailure) {
+ this.notifyOnLastFailure = notifyOnLastFailure;
+ }
+ public Map<String, String> getFlowOverride() {
+ return flowOverride;
+ }
+ public void setFlowOverride(Map<String, String> flowOverride) {
+ this.flowOverride = flowOverride;
+ }
+ public List<String> getDisabledJobs() {
+ return disabledJobs;
+ }
+ public void setDisabledJobs(List<String> disabledJobs) {
+ this.disabledJobs = disabledJobs;
+ }
+ private List<String> failureEmails;
+ private List<String> successEmails;
+ private FailureAction failureAction = FailureAction.FINISH_CURRENTLY_RUNNING;
+ private boolean notifyOnFirstFailure;
+ private boolean notifyOnLastFailure;
+ Map<String, String> flowOverride;
+ private List<String> disabledJobs;
+ public Object toObject() {
+ Map<String, Object> obj = new HashMap<String, Object>();
+ obj.put("failureEmails", failureEmails);
+ obj.put("successEmails", successEmails);
+ obj.put("failureAction", failureAction.toString());
+ obj.put("notifyOnFirstFailure", notifyOnFirstFailure);
+ obj.put("notifyOnLastFailure", notifyOnLastFailure);
+ obj.put("flowOverride", flowOverride);
+ obj.put("disabledJobs", disabledJobs);
+ return obj;
+ }
+ @SuppressWarnings("unchecked")
+ public static FlowOptions fromObject(Object object) {
+ if(object != null) {
+ FlowOptions flowOptions = new FlowOptions();
+ Map<String, Object> obj = (HashMap<String, Object>) object;
+ if(obj.containsKey("failureEmails")) {
+ flowOptions.setFailureEmails((List<String>) obj.get("failureEmails"));
+ }
+ if(obj.containsKey("successEmails")) {
+ flowOptions.setSuccessEmails((List<String>) obj.get("SuccessEmails"));
+ }
+ if(obj.containsKey("failureAction")) {
+ flowOptions.setFailureAction(FailureAction.valueOf((String)obj.get("failureAction")));
+ }
+ if(obj.containsKey("notifyOnFirstFailure")) {
+ flowOptions.setNotifyOnFirstFailure((Boolean)obj.get("notifyOnFirstFailure"));
+ }
+ if(obj.containsKey("notifyOnLastFailure")) {
+ flowOptions.setNotifyOnFirstFailure((Boolean)obj.get("notifyOnLastFailure"));
+ }
+ if(obj.containsKey("flowOverride")) {
+ flowOptions.setFlowOverride((Map<String, String>) obj.get("flowOverride"));
+ }
+ if(obj.containsKey("disabledJobs")) {
+ flowOptions.setDisabledJobs((List<String>) obj.get("disabledJobs"));
+ }
+ return flowOptions;
+ }
+ return null;
+ }
+ }
+ public static class SlaOptions {
+
+ public List<String> getSlaEmails() {
+ return slaEmails;
+ }
+ public void setSlaEmails(List<String> slaEmails) {
+ this.slaEmails = slaEmails;
+ }
+ public List<SlaSetting> getSettings() {
+ return settings;
+ }
+ public void setSettings(List<SlaSetting> settings) {
+ this.settings = settings;
+ }
+ private List<String> slaEmails;
+ private List<SlaSetting> settings;
+ public Object toObject() {
+ Map<String, Object> obj = new HashMap<String, Object>();
+ obj.put("slaEmails", slaEmails);
+ List<Object> slaSettings = new ArrayList<Object>();
+ for(SlaSetting s : settings) {
+ slaSettings.add(s.toObject());
+ }
+ obj.put("settings", slaSettings);
+ return obj;
+ }
+ @SuppressWarnings("unchecked")
+ public static SlaOptions fromObject(Object object) {
+ if(object != null) {
+ SlaOptions slaOptions = new SlaOptions();
+ Map<String, Object> obj = (HashMap<String, Object>) object;
+ slaOptions.setSlaEmails((List<String>) obj.get("slaEmails"));
+ List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
+ for(Object set: (List<Object>)obj.get("settings")) {
+ slaSets.add(SlaSetting.fromObject(set));
+ }
+ slaOptions.setSettings(slaSets);
+ return slaOptions;
+ }
+ return null;
+ }
+
+ }
+
// private long projectGuid;
// private long flowGuid;
@@ -51,6 +196,9 @@ public class Schedule{
private String status;
private long submitTime;
+ private FlowOptions flowOptions;
+ private SlaOptions slaOptions;
+
public Schedule(
int projectId,
String projectName,
@@ -75,31 +223,84 @@ public class Schedule{
this.submitUser = submitUser;
this.status = status;
this.submitTime = submitTime;
+ this.flowOptions = null;
+ this.slaOptions = null;
}
-
+
public Schedule(
- int projectId,
- String projectName,
- String flowName,
- String status,
- long firstSchedTime,
- String timezone,
- String period,
- long lastModifyTime,
- long nextExecTime,
- long submitTime,
- String submitUser) {
+ int projectId,
+ String projectName,
+ String flowName,
+ String status,
+ long firstSchedTime,
+ String timezoneId,
+ String period,
+ long lastModifyTime,
+ long nextExecTime,
+ long submitTime,
+ String submitUser,
+ FlowOptions flowOptions,
+ SlaOptions slaOptions
+ ) {
this.projectId = projectId;
this.projectName = projectName;
this.flowName = flowName;
this.firstSchedTime = firstSchedTime;
- this.timezone = DateTimeZone.forID(timezone);
+ this.timezone = DateTimeZone.forID(timezoneId);
this.lastModifyTime = lastModifyTime;
this.period = parsePeriodString(period);
this.nextExecTime = nextExecTime;
this.submitUser = submitUser;
this.status = status;
this.submitTime = submitTime;
+ this.flowOptions = flowOptions;
+ this.slaOptions = slaOptions;
+ }
+
+ public Schedule(
+ int projectId,
+ String projectName,
+ String flowName,
+ String status,
+ long firstSchedTime,
+ DateTimeZone timezone,
+ ReadablePeriod period,
+ long lastModifyTime,
+ long nextExecTime,
+ long submitTime,
+ String submitUser,
+ FlowOptions flowOptions,
+ SlaOptions slaOptions
+ ) {
+ this.projectId = projectId;
+ this.projectName = projectName;
+ this.flowName = flowName;
+ this.firstSchedTime = firstSchedTime;
+ this.timezone = timezone;
+ this.lastModifyTime = lastModifyTime;
+ this.period = period;
+ this.nextExecTime = nextExecTime;
+ this.submitUser = submitUser;
+ this.status = status;
+ this.submitTime = submitTime;
+ this.flowOptions = flowOptions;
+ this.slaOptions = slaOptions;
+ }
+
+ public FlowOptions getFlowOptions() {
+ return flowOptions;
+ }
+
+ public void setFlowOptions(FlowOptions flowOptions) {
+ this.flowOptions = flowOptions;
+ }
+
+ public SlaOptions getSlaOptions() {
+ return slaOptions;
+ }
+
+ public void setSlaOptions(SlaOptions slaOptions) {
+ this.slaOptions = slaOptions;
}
public String getScheduleName() {
@@ -155,85 +356,77 @@ public class Schedule{
}
public boolean updateTime() {
- if (new DateTime(nextExecTime).isAfterNow()) {
- return true;
- }
-
- if (period != null) {
- DateTime nextTime = getNextRuntime(nextExecTime, timezone, period);
-
- this.nextExecTime = nextTime.getMillis();
- return true;
- }
-
- return false;
- }
-// public String getFlowId(){
-// return this.scheduleId.split("\\.")[1];
-// }
-//
-// public String getProjectId(){
-// return this.scheduleId.split("\\.")[0];
-// }
-
+ if (new DateTime(nextExecTime).isAfterNow()) {
+ return true;
+ }
+
+ if (period != null) {
+ DateTime nextTime = getNextRuntime(nextExecTime, timezone, period);
+
+ this.nextExecTime = nextTime.getMillis();
+ return true;
+ }
+
+ return false;
+ }
- private DateTime getNextRuntime(long scheduleTime, DateTimeZone timezone, ReadablePeriod period) {
- DateTime now = new DateTime();
- DateTime date = new DateTime(scheduleTime).withZone(timezone);
- int count = 0;
- while (!now.isBefore(date)) {
- if (count > 100000) {
- throw new IllegalStateException(
- "100000 increments of period did not get to present time.");
- }
-
- if (period == null) {
- break;
- } else {
- date = date.plus(period);
- }
-
- count += 1;
- }
-
- return date;
- }
-
- public static ReadablePeriod parsePeriodString(String periodStr) {
- ReadablePeriod period;
- char periodUnit = periodStr.charAt(periodStr.length() - 1);
- if (periodUnit == 'n') {
- return null;
- }
-
- int periodInt = Integer.parseInt(periodStr.substring(0,
- periodStr.length() - 1));
- switch (periodUnit) {
- case 'M':
- period = Months.months(periodInt);
- break;
- case 'w':
- period = Weeks.weeks(periodInt);
- break;
- case 'd':
- period = Days.days(periodInt);
- break;
- case 'h':
- period = Hours.hours(periodInt);
- break;
- case 'm':
- period = Minutes.minutes(periodInt);
- break;
- case 's':
- period = Seconds.seconds(periodInt);
- break;
- default:
- throw new IllegalArgumentException("Invalid schedule period unit '"
- + periodUnit);
- }
-
- return period;
- }
+ private DateTime getNextRuntime(long scheduleTime, DateTimeZone timezone, ReadablePeriod period) {
+ DateTime now = new DateTime();
+ DateTime date = new DateTime(scheduleTime).withZone(timezone);
+ int count = 0;
+ while (!now.isBefore(date)) {
+ if (count > 100000) {
+ throw new IllegalStateException(
+ "100000 increments of period did not get to present time.");
+ }
+
+ if (period == null) {
+ break;
+ } else {
+ date = date.plus(period);
+ }
+
+ count += 1;
+ }
+
+ return date;
+ }
+
+ public static ReadablePeriod parsePeriodString(String periodStr) {
+ ReadablePeriod period;
+ char periodUnit = periodStr.charAt(periodStr.length() - 1);
+ if (periodUnit == 'n') {
+ return null;
+ }
+
+ int periodInt = Integer.parseInt(periodStr.substring(0,
+ periodStr.length() - 1));
+ switch (periodUnit) {
+ case 'M':
+ period = Months.months(periodInt);
+ break;
+ case 'w':
+ period = Weeks.weeks(periodInt);
+ break;
+ case 'd':
+ period = Days.days(periodInt);
+ break;
+ case 'h':
+ period = Hours.hours(periodInt);
+ break;
+ case 'm':
+ period = Minutes.minutes(periodInt);
+ break;
+ case 's':
+ period = Seconds.seconds(periodInt);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid schedule period unit '"
+ + periodUnit);
+ }
+
+ return period;
+ }
public static String createPeriodString(ReadablePeriod period) {
String periodStr = "n";
@@ -264,7 +457,44 @@ public class Schedule{
return periodStr;
}
-
+
+ public Map<String,Object> optionsToObject() {
+ if(flowOptions != null || slaOptions != null) {
+ HashMap<String, Object> schedObj = new HashMap<String, Object>();
+
+ if(flowOptions != null) {
+ schedObj.put("flowOptions", flowOptions.toObject());
+ }
+ if(slaOptions != null) {
+ schedObj.put("slaOptions", slaOptions.toObject());
+ }
+
+ return schedObj;
+ }
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static FlowOptions createFlowOptionFromObject(Object obj) {
+ if(obj != null) {
+ Map<String, Object> options = (HashMap<String, Object>) obj;
+ if(options.containsKey("flowOptions")) {
+ return FlowOptions.fromObject(options.get("flowOptions"));
+ }
+ }
+ return null;
+ }
+ @SuppressWarnings("unchecked")
+ public static SlaOptions createSlaOptionFromObject(Object obj) {
+ if(obj != null) {
+ Map<String, Object> options = (HashMap<String, Object>) obj;
+ if(options.containsKey("slaOptions")) {
+ return SlaOptions.fromObject(options.get("slaOptions"));
+ }
+ }
+ return null;
+ }
+
}
\ No newline at end of file
diff --git a/src/java/azkaban/scheduler/ScheduleLoader.java b/src/java/azkaban/scheduler/ScheduleLoader.java
index 4b55b4e..9caed47 100644
--- a/src/java/azkaban/scheduler/ScheduleLoader.java
+++ b/src/java/azkaban/scheduler/ScheduleLoader.java
@@ -16,6 +16,7 @@
package azkaban.scheduler;
+import java.sql.Connection;
import java.util.List;
public interface ScheduleLoader {
@@ -28,4 +29,6 @@ public interface ScheduleLoader {
public void removeSchedule(Schedule s) throws ScheduleManagerException;
+ public void updateNextExecTime(Schedule s) throws ScheduleManagerException;
+
}
\ No newline at end of file
src/java/azkaban/scheduler/ScheduleManager.java 83(+66 -17)
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index ef0f076..ad67def 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -17,6 +17,7 @@
package azkaban.scheduler;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -34,6 +35,8 @@ import org.joda.time.format.DateTimeFormatter;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.executor.ExecutableFlow.Status;
import azkaban.flow.Flow;
import azkaban.jobExecutor.utils.JobExecutionException;
@@ -41,6 +44,12 @@ import azkaban.project.Project;
import azkaban.project.ProjectManager;
+import azkaban.scheduler.Schedule.FlowOptions;
+import azkaban.scheduler.Schedule.SlaOptions;
+import azkaban.sla.SLA.SlaAction;
+import azkaban.sla.SLA.SlaRule;
+import azkaban.sla.SLAManager;
+import azkaban.sla.SLA.SlaSetting;
import azkaban.utils.Pair;
@@ -60,6 +69,7 @@ public class ScheduleManager {
private final ScheduleRunner runner;
private final ExecutorManager executorManager;
private final ProjectManager projectManager;
+ private final SLAManager slaManager;
/**
* Give the schedule manager a loader class that will properly load the
@@ -69,10 +79,12 @@ public class ScheduleManager {
*/
public ScheduleManager(ExecutorManager executorManager,
ProjectManager projectManager,
+ SLAManager slaManager,
ScheduleLoader loader)
{
this.executorManager = executorManager;
this.projectManager = projectManager;
+ this.slaManager = slaManager;
this.loader = loader;
this.runner = new ScheduleRunner();
@@ -169,9 +181,11 @@ public class ScheduleManager {
final long lastModifyTime,
final long nextExecTime,
final long submitTime,
- final String submitUser
+ final String submitUser,
+ final FlowOptions flowOptions,
+ final SlaOptions slaOptions
) {
- Schedule sched = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser);
+ Schedule sched = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, flowOptions, slaOptions);
logger.info("Scheduling flow '" + sched.getScheduleName() + "' for "
+ _dateFormat.print(firstSchedTime) + " with a period of "
+ period == null ? "(non-recurring)" : period);
@@ -362,19 +376,33 @@ public class ScheduleManager {
// Create ExecutableFlow
ExecutableFlow exflow = new ExecutableFlow(flow);
exflow.setSubmitUser(runningSched.getSubmitUser());
-
- // TODO make disabled in scheduled flow
- // Map<String, String> paramGroup =
- // this.getParamGroup(req, "disabled");
- // for (Map.Entry<String, String> entry:
- // paramGroup.entrySet()) {
- // boolean nodeDisabled =
- // Boolean.parseBoolean(entry.getValue());
- // exflow.setStatus(entry.getKey(),
- // nodeDisabled ? Status.DISABLED :
- // Status.READY);
- // }
-
+
+ FlowOptions flowOptions = runningSched.getFlowOptions();
+
+ if(flowOptions != null) {
+ if (flowOptions.getFailureAction() != null) {
+ exflow.setFailureAction(flowOptions.getFailureAction());
+ }
+ if (flowOptions.getFailureEmails() != null) {
+ exflow.setFailureEmails(flowOptions.getFailureEmails());
+ }
+ if (flowOptions.getSuccessEmails() != null) {
+ exflow.setSuccessEmails(flowOptions.getSuccessEmails());
+ }
+ exflow.setNotifyOnFirstFailure(flowOptions.isnotifyOnFirstFailure());
+ exflow.setNotifyOnLastFailure(flowOptions.isnotifyOnLastFailure());
+
+ exflow.addFlowParameters(flowOptions.getFlowOverride());
+
+ List<String> disabled = flowOptions.getDisabledJobs();
+ // Setup disabled
+ if(disabled != null) {
+ for (String job : disabled) {
+ exflow.setNodeStatus(job, Status.DISABLED);
+ }
+ }
+ }
+
try {
executorManager.submitExecutableFlow(exflow);
logger.info("Scheduler has invoked " + exflow.getExecutionId());
@@ -383,9 +411,30 @@ public class ScheduleManager {
logger.error(e.getMessage());
return;
}
- } catch (JobExecutionException e) {
- logger.info("Could not run flow. " + e.getMessage());
+
+ SlaOptions slaOptions = runningSched.getSlaOptions();
+
+ if(slaOptions != null) {
+ // submit flow slas
+ List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
+ for(SlaSetting set : slaOptions.getSettings()) {
+ if(set.getId().equals("")) {
+ DateTime checkTime = new DateTime(runningSched.getNextExecTime()).plus(set.getDuration());
+ slaManager.submitSla(exflow.getExecutionId(), "", checkTime, slaOptions.getSlaEmails(), set.getActions(), null, set.getRule());
+ }
+ else {
+ jobsettings.add(set);
+ }
+ }
+ if(jobsettings.size() > 0) {
+ slaManager.submitSla(exflow.getExecutionId(), "", DateTime.now(), slaOptions.getSlaEmails(), new ArrayList<SlaAction>(), jobsettings, SlaRule.WAITANDCHECKJOB);
+ }
+ }
+
+ } catch (Exception e) {
+ logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
}
+
}
removeRunnerSchedule(runningSched);
src/java/azkaban/sla/JdbcSLALoader.java 330(+330 -0)
diff --git a/src/java/azkaban/sla/JdbcSLALoader.java b/src/java/azkaban/sla/JdbcSLALoader.java
new file mode 100644
index 0000000..cf6cd2f
--- /dev/null
+++ b/src/java/azkaban/sla/JdbcSLALoader.java
@@ -0,0 +1,330 @@
+package azkaban.sla;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.ReadablePeriod;
+
+import azkaban.scheduler.ScheduleManagerException;
+import azkaban.sla.SLA.SlaRule;
+import azkaban.utils.DataSourceUtils;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
+
+public class JdbcSLALoader implements SLALoader {
+
+ private static final Logger logger = Logger.getLogger(JdbcSLALoader.class);
+
+ /**
+ * Used for when we store text data. Plain uses UTF8 encoding.
+ */
+ public static enum EncodingType {
+ PLAIN(1), GZIP(2);
+
+ private int numVal;
+
+ EncodingType(int numVal) {
+ this.numVal = numVal;
+ }
+
+ public int getNumVal() {
+ return numVal;
+ }
+
+ public static EncodingType fromInteger(int x) {
+ switch (x) {
+ case 1:
+ return PLAIN;
+ case 2:
+ return GZIP;
+ default:
+ return PLAIN;
+ }
+ }
+ }
+
+ private DataSource dataSource;
+ private EncodingType defaultEncodingType = EncodingType.GZIP;
+
+ private static String slaTblName = "active_sla";
+
+ final private static String LOAD_ALL_SLA =
+ "SELECT exec_id, job_name, check_time, rule, enc_type, options FROM " + slaTblName;
+
+ final private static String INSERT_SLA =
+ "INSERT INTO " + slaTblName + " ( exec_id, job_name, check_time, rule, enc_type, options) values (?,?,?,?,?,?)";
+
+// final private static String UPDATE_SLA =
+// "UPDATE " + slaTblName + " SET exec_id, job_name, check_time, rule, enc_type, options) values (?,?,?,?,?,?)";
+//
+ // use "" for flow
+ final private static String REMOVE_SLA =
+ "DELETE FROM " + slaTblName + " WHERE exec_id=? AND job_name=? AND check_time=? AND rule=?";
+
+ public JdbcSLALoader(Props props) {
+ String databaseType = props.getString("database.type");
+
+ if (databaseType.equals("mysql")) {
+ int port = props.getInt("mysql.port");
+ String host = props.getString("mysql.host");
+ String database = props.getString("mysql.database");
+ String user = props.getString("mysql.user");
+ String password = props.getString("mysql.password");
+ int numConnections = props.getInt("mysql.numconnections");
+
+ dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
+ }
+ }
+
+ private Connection getConnection() throws SLAManagerException {
+ Connection connection = null;
+ try {
+ connection = dataSource.getConnection();
+ } catch (Exception e) {
+ DbUtils.closeQuietly(connection);
+ throw new SLAManagerException("Error getting DB connection.", e);
+ }
+ return connection;
+ }
+
+ public EncodingType getDefaultEncodingType() {
+ return defaultEncodingType;
+ }
+
+ public void setDefaultEncodingType(EncodingType defaultEncodingType) {
+ this.defaultEncodingType = defaultEncodingType;
+ }
+
+ @Override
+ public List<SLA> loadSLAs() throws SLAManagerException {
+ logger.info("Loading all SLAs from db.");
+
+ Connection connection = getConnection();
+ List<SLA> SLAs;
+ try{
+ SLAs= loadSLAs(connection, defaultEncodingType);
+ }
+ catch(SLAManagerException e) {
+ throw new SLAManagerException("Failed to load SLAs" + e.getCause());
+ }
+ finally{
+ DbUtils.closeQuietly(connection);
+ }
+
+ logger.info("Loaded " + SLAs.size() + " SLAs.");
+
+ return SLAs;
+
+ }
+
+ public List<SLA> loadSLAs(Connection connection, EncodingType encType) throws SLAManagerException {
+ logger.info("Loading all SLAs from db.");
+
+ QueryRunner runner = new QueryRunner();
+ ResultSetHandler<List<SLA>> handler = new SLAResultHandler();
+ List<SLA> SLAs;
+
+ try {
+ SLAs = runner.query(connection, LOAD_ALL_SLA, handler);
+ } catch (SQLException e) {
+ logger.error(LOAD_ALL_SLA + " failed.");
+ throw new SLAManagerException("Load SLAs from db failed. "+ e.getCause());
+ }
+
+ return SLAs;
+
+ }
+
+ @Override
+ public void removeSLA(SLA s) throws SLAManagerException {
+ Connection connection = getConnection();
+ try{
+ removeSLA(connection, s);
+ }
+ catch(SLAManagerException e) {
+ logger.error(LOAD_ALL_SLA + " failed.");
+ throw new SLAManagerException("Load SLAs from db failed. "+ e.getCause());
+ }
+ finally {
+ DbUtils.closeQuietly(connection);
+ }
+
+ }
+
+ private void removeSLA(Connection connection, SLA s) throws SLAManagerException {
+
+ logger.info("Removing SLA " + s.toString() + " from db.");
+
+ QueryRunner runner = new QueryRunner(dataSource);
+
+ try {
+ int removes = runner.update(REMOVE_SLA, s.getExecId(), s.getJobName(), s.getCheckTime().getMillis(), s.getRule().getNumVal());
+ if (removes == 0) {
+ throw new SLAManagerException("No schedule has been removed.");
+ }
+ } catch (SQLException e) {
+ logger.error("Remove SLA failed. " + s.toString());
+ throw new SLAManagerException("Remove SLA " + s.toString() + " from db failed. "+ e.getCause());
+ }
+
+ }
+
+ @Override
+ public void insertSLA(SLA s) throws SLAManagerException {
+ Connection connection = getConnection();
+ try{
+ insertSLA(connection, s, defaultEncodingType);
+ }
+ catch(SLAManagerException e){
+ logger.error("Insert SLA failed. " + s.toString());
+ throw new SLAManagerException("Insert SLA " + s.toString() + " into db failed. "+ e.getCause() + e.getMessage() + e.getStackTrace());
+ }
+ finally {
+ DbUtils.closeQuietly(connection);
+ }
+ }
+
+ private void insertSLA(Connection connection, SLA s, EncodingType encType) throws SLAManagerException {
+
+ logger.debug("Inserting new SLA into DB. " + s.toString());
+
+ QueryRunner runner = new QueryRunner();
+
+ String json = JSONUtils.toJSON(s.optionToObject());
+ byte[] data = null;
+ try {
+ byte[] stringData = json.getBytes("UTF-8");
+ data = stringData;
+
+ if (encType == EncodingType.GZIP) {
+ data = GZIPUtils.gzipBytes(stringData);
+ }
+ logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+ }
+ catch (IOException e) {
+ throw new SLAManagerException("Error encoding the sla options.");
+ }
+
+ try {
+ int inserts = runner.update(connection, INSERT_SLA, s.getExecId(), s.getJobName(), s.getCheckTime().getMillis(), s.getRule().getNumVal(), encType.getNumVal(), data);
+
+ if (inserts == 0) {
+ throw new SLAManagerException("No SLA has been inserted. Insertion failed.");
+ }
+ } catch (SQLException e) {
+ logger.error("Insert SLA failed.");
+ throw new SLAManagerException("Insert sla " + s.toString() + " into db failed. " + e.getCause());
+ }
+
+ }
+
+// @Override
+// public void updateSLA(SLA s) throws SLAManagerException {
+// Connection connection = getConnection();
+// try{
+// insertSLA(connection, s, defaultEncodingType);
+// }
+// catch(SLAManagerException e){
+// logger.error("Insert SLA failed. " + s.toString());
+// throw new SLAManagerException("Insert SLA " + s.toString() + " into db failed. "+ e.getCause() + e.getMessage() + e.getStackTrace());
+// }
+// finally {
+// DbUtils.closeQuietly(connection);
+// }
+// }
+//
+// private void updateSLA(Connection connection, SLA s, EncodingType encType) throws SLAManagerException {
+//
+// logger.debug("Inserting new SLA into DB. " + s.toString());
+//
+// QueryRunner runner = new QueryRunner();
+//
+// String json = JSONUtils.toJSON(s.optionToObject());
+// byte[] data = null;
+// try {
+// byte[] stringData = json.getBytes("UTF-8");
+// data = stringData;
+//
+// if (encType == EncodingType.GZIP) {
+// data = GZIPUtils.gzipBytes(stringData);
+// }
+// logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+// }
+// catch (IOException e) {
+// throw new SLAManagerException("Error encoding the sla options.");
+// }
+//
+// try {
+// int inserts = runner.update(connection, INSERT_SLA, s.getExecId(), s.getJobName(), s.getCheckTime().getMillis(), s.getRule().getNumVal(), encType.getNumVal(), data);
+//
+// if (inserts == 0) {
+// throw new SLAManagerException("No SLA has been inserted. Insertion failed.");
+// }
+// } catch (SQLException e) {
+// logger.error("Insert SLA failed.");
+// throw new SLAManagerException("Insert sla " + s.toString() + " into db failed. " + e.getCause());
+// }
+//
+// }
+
+ public class SLAResultHandler implements ResultSetHandler<List<SLA>> {
+ @Override
+ public List<SLA> handle(ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.<SLA>emptyList();
+ }
+
+ ArrayList<SLA> SLAs = new ArrayList<SLA>();
+
+ do {
+ int execId = rs.getInt(1);
+ String jobName = rs.getString(2);
+ DateTime checkTime = new DateTime(rs.getLong(3));
+ SlaRule rule = SlaRule.fromInteger(rs.getInt(4));
+ int encodingType = rs.getInt(5);
+ byte[] data = rs.getBytes(6);
+
+ SLA s = null;
+
+ EncodingType encType = EncodingType.fromInteger(encodingType);
+ Object optsObj;
+ try {
+ // Convoluted way to inflate strings. Should find common package or helper function.
+ if (encType == EncodingType.GZIP) {
+ // Decompress the sucker.
+ String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+ optsObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+ else {
+ String jsonString = new String(data, "UTF-8");
+ optsObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+ s = SLA.createSlaFromObject(execId, jobName, checkTime, rule, optsObj);
+ } catch (IOException e) {
+ throw new SQLException("Error reconstructing SLA options. " + execId + " " + jobName + " " + checkTime.toDateTimeISO() + e.getCause());
+ }
+ SLAs.add(s);
+
+ } while (rs.next());
+
+ return SLAs;
+ }
+
+ }
+
+}
src/java/azkaban/sla/SLA.java 252(+252 -0)
diff --git a/src/java/azkaban/sla/SLA.java b/src/java/azkaban/sla/SLA.java
new file mode 100644
index 0000000..eb08229
--- /dev/null
+++ b/src/java/azkaban/sla/SLA.java
@@ -0,0 +1,252 @@
+package azkaban.sla;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.joda.time.ReadablePeriod;
+
+import azkaban.scheduler.Schedule;
+
+public class SLA {
+
+ public static enum SlaRule {
+ SUCCESS(1), FINISH(2), WAITANDCHECKJOB(3);
+
+ private int numVal;
+
+ SlaRule(int numVal) {
+ this.numVal = numVal;
+ }
+
+ public int getNumVal() {
+ return numVal;
+ }
+
+ public static SlaRule fromInteger(int x) {
+ switch (x) {
+ case 1:
+ return SUCCESS;
+ case 2:
+ return FINISH;
+ case 3:
+ return WAITANDCHECKJOB;
+ default:
+ return SUCCESS;
+ }
+ }
+ }
+
+ public static enum SlaAction {
+ EMAIL(1), KILL(2);
+
+ private int numVal;
+
+ SlaAction(int numVal) {
+ this.numVal = numVal;
+ }
+
+ public int getNumVal() {
+ return numVal;
+ }
+
+ public static SlaAction fromInteger(int x) {
+ switch (x) {
+ case 1:
+ return EMAIL;
+ case 2:
+ return KILL;
+ default:
+ return EMAIL;
+ }
+ }
+ }
+
+ public static class SlaSetting {
+ public String getId() {
+ return id;
+ }
+ public void setId(String id) {
+ this.id = id;
+ }
+ public ReadablePeriod getDuration() {
+ return duration;
+ }
+ public void setDuration(ReadablePeriod duration) {
+ this.duration = duration;
+ }
+ public SlaRule getRule() {
+ return rule;
+ }
+ public void setRule(SlaRule rule) {
+ this.rule = rule;
+ }
+ public List<SlaAction> getActions() {
+ return actions;
+ }
+ public void setActions(List<SlaAction> actions) {
+ this.actions = actions;
+ }
+
+ public Object toObject() {
+ Map<String, Object> obj = new HashMap<String, Object>();
+ obj.put("id", id);
+ obj.put("duration", Schedule.createPeriodString(duration));
+// List<String> rulesObj = new ArrayList<String>();
+// for(SlaRule rule : rules) {
+// rulesObj.add(rule.toString());
+// }
+// obj.put("rules", rulesObj);
+ obj.put("rule", rule.toString());
+ List<String> actionsObj = new ArrayList<String>();
+ for(SlaAction act : actions) {
+ actionsObj.add(act.toString());
+ }
+ obj.put("actions", actionsObj);
+ return obj;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static SlaSetting fromObject(Object obj) {
+ Map<String, Object> slaObj = (HashMap<String, Object>) obj;
+ String subId = (String) slaObj.get("id");
+ ReadablePeriod dur = Schedule.parsePeriodString((String) slaObj.get("duration"));
+// List<String> rulesObj = (ArrayList<String>) slaObj.get("rules");
+// List<SlaRule> slaRules = new ArrayList<SLA.SlaRule>();
+// for(String rule : rulesObj) {
+// slaRules.add(SlaRule.valueOf(rule));
+// }
+ SlaRule slaRule = SlaRule.valueOf((String) slaObj.get("rule"));
+ List<String> actsObj = (ArrayList<String>) slaObj.get("actions");
+ List<SlaAction> slaActs = new ArrayList<SlaAction>();
+ for(String act : actsObj) {
+ slaActs.add(SlaAction.valueOf(act));
+ }
+
+ SlaSetting ret = new SlaSetting();
+ ret.setId(subId);
+ ret.setDuration(dur);
+ ret.setRule(slaRule);
+ ret.setActions(slaActs);
+ return ret;
+ }
+
+ private String id;
+ private ReadablePeriod duration;
+ private SlaRule rule = SlaRule.SUCCESS;
+ private List<SlaAction> actions;
+ }
+
+ private int execId;
+ private String jobName;
+ private DateTime checkTime;
+ private List<String> emails;
+ private List<SlaAction> actions;
+ private List<SlaSetting> jobSettings;
+ private SlaRule rule;
+
+ public SLA(
+ int execId,
+ String jobName,
+ DateTime checkTime,
+ List<String> emails,
+ List<SlaAction> slaActions,
+ List<SlaSetting> jobSettings,
+ SlaRule slaRule
+ ) {
+ this.execId = execId;
+ this.jobName = jobName;
+ this.checkTime = checkTime;
+ this.emails = emails;
+ this.actions = slaActions;
+ this.jobSettings = jobSettings;
+ this.rule = slaRule;
+ }
+
+ public int getExecId() {
+ return execId;
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ public DateTime getCheckTime() {
+ return checkTime;
+ }
+
+ public List<String> getEmails() {
+ return emails;
+ }
+
+ public List<SlaAction> getActions() {
+ return actions;
+ }
+
+ public List<SlaSetting> getJobSettings() {
+ return jobSettings;
+ }
+
+ public SlaRule getRule() {
+ return rule;
+ }
+
+ public String toString() {
+ return execId + " " + jobName + " to be checked at " + checkTime.toDateTimeISO();
+ }
+
+ public Map<String,Object> optionToObject() {
+ HashMap<String, Object> slaObj = new HashMap<String, Object>();
+
+ slaObj.put("emails", emails);
+// slaObj.put("rule", rule.toString());
+
+ List<String> actionsObj = new ArrayList<String>();
+ for(SlaAction act : actions) {
+ actionsObj.add(act.toString());
+ }
+ slaObj.put("actions", actionsObj);
+
+ if(jobSettings != null && jobSettings.size() > 0) {
+ List<Object> settingsObj = new ArrayList<Object>();
+ for(SlaSetting set : jobSettings) {
+ settingsObj.add(set.toObject());
+ }
+ slaObj.put("jobSettings", settingsObj);
+ }
+
+
+ return slaObj;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static SLA createSlaFromObject(int execId, String jobName, DateTime checkTime, SlaRule rule, Object obj) {
+
+ HashMap<String, Object> slaObj = (HashMap<String,Object>)obj;
+
+ List<String> emails = (List<String>)slaObj.get("emails");
+// SlaRule rule = SlaRule.valueOf((String)slaObj.get("rule"));
+ List<String> actsObj = (ArrayList<String>) slaObj.get("actions");
+ List<SlaAction> slaActs = new ArrayList<SlaAction>();
+ for(String act : actsObj) {
+ slaActs.add(SlaAction.valueOf(act));
+ }
+ List<SlaSetting> jobSets = null;
+ if(slaObj.containsKey("jobSettings") && slaObj.get("jobSettings") != null) {
+ jobSets = new ArrayList<SLA.SlaSetting>();
+ for(Object set : (List<Object>)slaObj.get("jobSettings")) {
+ SlaSetting jobSet = SlaSetting.fromObject(set);
+ jobSets.add(jobSet);
+ }
+ }
+
+ return new SLA(execId, jobName, checkTime, emails, slaActs, jobSets, rule);
+ }
+
+ public void setCheckTime(DateTime time) {
+ this.checkTime = time;
+ }
+
+}
src/java/azkaban/sla/SLALoader.java 14(+14 -0)
diff --git a/src/java/azkaban/sla/SLALoader.java b/src/java/azkaban/sla/SLALoader.java
new file mode 100644
index 0000000..2c32b33
--- /dev/null
+++ b/src/java/azkaban/sla/SLALoader.java
@@ -0,0 +1,14 @@
+package azkaban.sla;
+
+import java.util.List;
+
+public interface SLALoader {
+
+ public void insertSLA(SLA s) throws SLAManagerException;
+
+ public List<SLA> loadSLAs() throws SLAManagerException;
+
+ public void removeSLA(SLA s) throws SLAManagerException;
+
+// public void updateSLA(SLA s) throws SLAManagerException;
+}
src/java/azkaban/sla/SlaMailer.java 228(+228 -0)
diff --git a/src/java/azkaban/sla/SlaMailer.java b/src/java/azkaban/sla/SlaMailer.java
new file mode 100644
index 0000000..17a67d0
--- /dev/null
+++ b/src/java/azkaban/sla/SlaMailer.java
@@ -0,0 +1,228 @@
+package azkaban.sla;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.mail.MessagingException;
+
+import org.apache.log4j.Logger;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.executor.ExecutableFlow.Status;
+import azkaban.executor.ExecutableNode;
+import azkaban.sla.SLA;
+import azkaban.utils.EmailMessage;
+import azkaban.utils.Props;
+import azkaban.utils.Utils;
+
+public class SlaMailer {
+ private static Logger logger = Logger.getLogger(SlaMailer.class);
+
+ private boolean testMode = false;
+ private String clientHostname;
+ private String clientPortNumber;
+
+ private String mailHost;
+ private String mailUser;
+ private String mailPassword;
+ private String mailSender;
+ private String azkabanName;
+
+ public SlaMailer(Props props) {
+ this.azkabanName = props.getString("azkaban.name", "azkaban");
+ this.mailHost = props.getString("mail.host", "localhost");
+ this.mailUser = props.getString("mail.user", "");
+ this.mailPassword = props.getString("mail.password", "");
+ this.mailSender = props.getString("mail.sender", "");
+
+ this.clientHostname = props.getString("jetty.hostname", "localhost");
+ this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
+
+ testMode = props.getBoolean("test.mode", false);
+ }
+
+ public void sendFirstErrorMessage(ExecutableFlow flow) {
+ List<String> emailList = flow.getFailureEmails();
+ int execId = flow.getExecutionId();
+
+ if (emailList != null && !emailList.isEmpty()) {
+ EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+ message.setFromAddress(mailSender);
+ message.addAllToAddress(emailList);
+ message.setMimeType("text/html");
+ message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
+
+ message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + azkabanName + "</h2>");
+
+ if (flow.getFailureAction() == FailureAction.CANCEL_ALL) {
+ message.println("This flow is set to cancel all currently running jobs.");
+ }
+ else if (flow.getFailureAction() == FailureAction.FINISH_ALL_POSSIBLE){
+ message.println("This flow is set to complete all jobs that aren't blocked by the failure.");
+ }
+ else {
+ message.println("This flow is set to complete all currently running jobs before stopping.");
+ }
+
+ message.println("<table>");
+ message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
+ message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
+ message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
+ message.println("</table>");
+ message.println("");
+ String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+ message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+
+ message.println("");
+ message.println("<h3>Reason</h3>");
+ List<String> failedJobs = findFailedJobs(flow);
+ message.println("<ul>");
+ for (String jobId : failedJobs) {
+ message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
+ }
+
+ message.println("</ul>");
+
+ if (!testMode) {
+ try {
+ message.sendEmail();
+ } catch (MessagingException e) {
+ logger.error("Email message send failed" , e);
+ }
+ }
+ }
+ }
+
+ public void sendErrorEmail(ExecutableFlow flow, String ... extraReasons) {
+ List<String> emailList = flow.getFailureEmails();
+ int execId = flow.getExecutionId();
+
+ if (emailList != null && !emailList.isEmpty()) {
+ EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+ message.setFromAddress(mailSender);
+ message.addAllToAddress(emailList);
+ message.setMimeType("text/html");
+ message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
+
+ message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has failed on " + azkabanName + "</h2>");
+ message.println("<table>");
+ message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
+ message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
+ message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
+ message.println("</table>");
+ message.println("");
+ String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+ message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+
+ message.println("");
+ message.println("<h3>Reason</h3>");
+ List<String> failedJobs = findFailedJobs(flow);
+ message.println("<ul>");
+ for (String jobId : failedJobs) {
+ message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
+ }
+ for (String reasons: extraReasons) {
+ message.println("<li>" + reasons + "</li>");
+ }
+
+ message.println("</ul>");
+
+
+
+ if (!testMode) {
+ try {
+ message.sendEmail();
+ } catch (MessagingException e) {
+ logger.error("Email message send failed" , e);
+ }
+ }
+ }
+ }
+
+ public void sendSuccessEmail(ExecutableFlow flow) {
+ List<String> emailList = flow.getSuccessEmails();
+
+ int execId = flow.getExecutionId();
+
+ if (emailList != null && !emailList.isEmpty()) {
+ EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+ message.setFromAddress(mailSender);
+ message.addAllToAddress(emailList);
+ message.setMimeType("text/html");
+ message.setSubject("Flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName);
+
+ message.println("<h2> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName + "</h2>");
+ message.println("<table>");
+ message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
+ message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
+ message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
+ message.println("</table>");
+ message.println("");
+ String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+ message.println("<a href=\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+
+ if (!testMode) {
+ try {
+ message.sendEmail();
+ } catch (MessagingException e) {
+ logger.error("Email message send failed" , e);
+ }
+ }
+ }
+ }
+
+ private List<String> findFailedJobs(ExecutableFlow flow) {
+ ArrayList<String> failedJobs = new ArrayList<String>();
+ for (ExecutableNode node: flow.getExecutableNodes()) {
+ if (node.getStatus() == Status.FAILED) {
+ failedJobs.add(node.getJobId());
+ }
+ }
+
+ return failedJobs;
+ }
+
+ public void sendSlaEmail(SLA s, String ... extraReasons) {
+ List<String> emailList = s.getEmails();
+
+ if (emailList != null && !emailList.isEmpty()) {
+ EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+ message.setFromAddress(mailSender);
+ message.addAllToAddress(emailList);
+ message.setMimeType("text/html");
+ message.setSubject("SLA violation on " + azkabanName);
+
+// message.println("<h2 style=\"color:#FF0000\"> Execution '" + s.getExecId() + "' of flow '" + flow.getFlowId() + "' failed to meet SLA on " + azkabanName + "</h2>");
+// message.println("<table>");
+// message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
+// message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
+// message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
+// message.println("</table>");
+// message.println("");
+// String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+// message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+//
+// message.println("");
+// message.println("<h3>Reason</h3>");
+// List<String> failedJobs = findFailedJobs(flow);
+// message.println("<ul>");
+// for (String jobId : failedJobs) {
+// message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
+// }
+ for (String reasons: extraReasons) {
+ message.println("<li>" + reasons + "</li>");
+ }
+
+ message.println("</ul>");
+
+ if (!testMode) {
+ try {
+ message.sendEmail();
+ } catch (MessagingException e) {
+ logger.error("Email message send failed" , e);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
src/java/azkaban/sla/SLAManager.java 467(+467 -0)
diff --git a/src/java/azkaban/sla/SLAManager.java b/src/java/azkaban/sla/SLAManager.java
new file mode 100644
index 0000000..ae17595
--- /dev/null
+++ b/src/java/azkaban/sla/SLAManager.java
@@ -0,0 +1,467 @@
+package azkaban.sla;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+import org.apache.velocity.runtime.parser.node.GetExecutor;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlow.Status;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorMailer;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
+import azkaban.sla.SLA.SlaAction;
+import azkaban.sla.SLA.SlaRule;
+import azkaban.sla.SLA.SlaSetting;
+import azkaban.user.User;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+
+
+/**
+ * The SLAManager stores and checks the SLA (Service Level Agreement). It uses a single thread
+ * instead and waits until correct check time for the flow, and individual jobs in the flow if their SLA is set.
+ */
+public class SLAManager {
+ private static Logger logger = Logger.getLogger(SLAManager.class);
+
+ private SLALoader loader;
+
+ private final SLARunner runner;
+ private final SLAPreRunner prerunner;
+ private final ExecutorManager executorManager;
+ private SlaMailer mailer;
+
+ /**
+ * Give the sla manager a loader class that will properly load the
+ * sla.
+ *
+ * @param loader
+ * @throws SLAManagerException
+ */
+ public SLAManager(ExecutorManager executorManager,
+ SLALoader loader,
+ Props props) throws SLAManagerException
+ {
+ this.executorManager = executorManager;
+ this.loader = loader;
+ this.mailer = new SlaMailer(props);
+ this.runner = new SLARunner();
+ this.prerunner = new SLAPreRunner();
+
+ List<SLA> SLAList = null;
+ try {
+ SLAList = loader.loadSLAs();
+ } catch (SLAManagerException e) {
+ // TODO Auto-generated catch block
+ throw e;
+ }
+
+ for (SLA sla : SLAList) {
+ runner.addRunnerSLA(sla);
+ }
+
+ this.runner.start();
+ }
+
+ /**
+ * Shutdowns the sla thread. After shutdown, it may not be safe to use
+ * it again.
+ */
+ public void shutdown() {
+ this.runner.shutdown();
+ this.prerunner.shutdown();
+ }
+
+ /**
+ * Removes the flow from the SLA if it exists.
+ *
+ * @param id
+ * @throws SLAManagerException
+ */
+ public void removeSLA(SLA s) throws SLAManagerException {
+ logger.info("Removing SLA " + s.toString());
+ runner.removeRunnerSLA(s);
+ loader.removeSLA(s);
+ }
+
+ public void submitSla(
+ int execId,
+ String id,
+ DateTime checkTime,
+ List<String> emails,
+ List<SlaAction> slaActions,
+ List<SlaSetting> jobSettings,
+ SlaRule slaRule
+ ) throws SLAManagerException {
+ SLA s = new SLA(execId, id, checkTime, emails, slaActions, jobSettings, slaRule);
+ logger.info("Submitting SLA " + s.toString());
+ try {
+ loader.insertSLA(s);
+ runner.addRunnerSLA(s);
+ }
+ catch (SLAManagerException e) {
+ throw new SLAManagerException("Failed to add new SLA!" + e.getCause());
+ }
+ }
+
+ /**
+ * Thread that simply invokes the checking of flows when the SLA is
+ * ready.
+ *
+ */
+ public class SLARunner extends Thread {
+ private final PriorityBlockingQueue<SLA> SLAs;
+ private AtomicBoolean stillAlive = new AtomicBoolean(true);
+
+ // Five minute minimum intervals
+ private static final int TIMEOUT_MS = 60000;
+
+ public SLARunner() {
+ SLAs = new PriorityBlockingQueue<SLA>(1,new SLAComparator());
+ }
+
+ public void shutdown() {
+ logger.error("Shutting down SLA runner thread");
+ stillAlive.set(false);
+ this.interrupt();
+ }
+
+ /**
+ * Return a list of flow with SLAs
+ *
+ * @return
+ */
+ protected synchronized List<SLA> getRunnerSLAs() {
+ return new ArrayList<SLA>(SLAs);
+ }
+
+ /**
+ * Adds SLA into runner and then interrupts so it will update
+ * its wait time.
+ *
+ * @param flow
+ */
+ public synchronized void addRunnerSLA(SLA s) {
+ logger.info("Adding " + s + " to SLA runner.");
+ SLAs.add(s);
+ this.interrupt();
+ }
+
+ /**
+ * Remove runner SLA. Does not interrupt.
+ *
+ * @param flow
+ * @throws SLAManagerException
+ */
+ public synchronized void removeRunnerSLA(SLA s) {
+ logger.info("Removing " + s + " from the SLA runner.");
+ SLAs.remove(s);
+ }
+
+ public void run() {
+ while (stillAlive.get()) {
+ synchronized (this) {
+ try {
+ // TODO clear up the exception handling
+ SLA s = SLAs.peek();
+
+ if (s == null) {
+ // If null, wake up every minute or so to see if
+ // there's something to do. Most likely there will not be.
+ try {
+ this.wait(TIMEOUT_MS);
+ } catch (InterruptedException e) {
+ // interruption should occur when items are added or removed from the queue.
+ }
+ } else {
+ // We've passed the flow execution time, so we will run.
+ if (!(new DateTime(s.getCheckTime())).isAfterNow()) {
+ // Run flow. The invocation of flows should be quick.
+ SLA runningSLA = SLAs.poll();
+
+ logger.info("Checking sla " + runningSLA.toString() );
+
+ int execId = s.getExecId();
+ ExecutableFlow exflow = executorManager.getExecutableFlow(execId);
+
+ if(runningSLA.getJobName().equals("") && runningSLA.getRule().equals(SlaRule.WAITANDCHECKJOB)) {
+ // do the checking of potential jobsla submissions
+ List<SlaSetting> jobSettings = runningSLA.getJobSettings();
+ List<SlaSetting> removeSettings = new ArrayList<SLA.SlaSetting>();
+ for(SlaSetting set : jobSettings) {
+ ExecutableNode node = exflow.getExecutableNode(set.getId());
+ if(node.getStartTime() != -1 || executorManager.isFinished(exflow)) {
+ submitSla(execId, set.getId(), new DateTime(node.getStartTime()).plus(set.getDuration()), runningSLA.getEmails(), set.getActions(), null, set.getRule());
+ removeSettings.add(set);
+ logger.info("Job " + set.getId() + " already started, monitoring SLA.");
+ }
+ }
+ for(SlaSetting remove : removeSettings) {
+ jobSettings.remove(remove);
+ }
+ if(jobSettings.size() == 0) {
+ removeRunnerSLA(runningSLA);
+ loader.removeSLA(runningSLA);
+ }
+ else {
+ removeRunnerSLA(runningSLA);
+ loader.removeSLA(runningSLA);
+ runningSLA.setCheckTime(runningSLA.getCheckTime().plusMillis(TIMEOUT_MS));
+ addRunnerSLA(runningSLA);
+ loader.insertSLA(runningSLA);
+ }
+ }
+ else {
+ if(!metSla(runningSLA, exflow)) {
+ takeSLAActions(runningSLA, exflow);
+ }
+
+
+ removeRunnerSLA(runningSLA);
+ loader.removeSLA(runningSLA);
+ }
+ } else {
+ // wait until flow run
+ long millisWait = Math.max(0, s.getCheckTime().getMillis() - (new DateTime()).getMillis());
+ try {
+ this.wait(Math.min(millisWait, TIMEOUT_MS));
+ } catch (InterruptedException e) {
+ // interruption should occur when items are
+ // added or removed from the queue.
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Unexpected exception has been thrown in scheduler", e);
+ } catch (Throwable e) {
+ logger.error("Unexpected throwable has been thrown in scheduler", e);
+ }
+ }
+ }
+ }
+
+ private boolean metSla(SLA s, ExecutableFlow exflow) {
+ SlaRule rule = s.getRule();
+ long finishTime;
+ Status status;
+ if(s.getJobName().equals("")) {
+ finishTime = exflow.getEndTime();
+ status = exflow.getStatus();
+ }
+ else {
+ ExecutableNode exnode = exflow.getExecutableNode(s.getJobName());
+ finishTime = exnode.getEndTime();
+ status = exnode.getStatus();
+ }
+
+ switch(rule) {
+ case FINISH: // check finish time
+ return finishTime != -1 && finishTime < s.getCheckTime().getMillis();
+ case SUCCESS: // check finish and successful
+ return status == Status.SUCCEEDED && finishTime < s.getCheckTime().getMillis();
+ default:
+ logger.error("Unknown SLA rules!");
+ return false;
+ }
+ }
+
+ /**
+ * Class to sort the sla based on time.
+ *
+ */
+ private class SLAComparator implements Comparator<SLA> {
+ @Override
+ public int compare(SLA arg0, SLA arg1) {
+ long first = arg1.getCheckTime().getMillis();
+ long second = arg0.getCheckTime().getMillis();
+
+ if (first == second) {
+ return 0;
+ } else if (first < second) {
+ return 1;
+ }
+
+ return -1;
+ }
+ }
+ }
+
+ private void takeSLAActions(SLA s, ExecutableFlow exflow) {
+ logger.info("SLA " + s.toString() + " missed! Taking predefined actions");
+ List<SlaAction> actions = s.getActions();
+ for(SlaAction act : actions) {
+ if(act.equals(SlaAction.EMAIL)) {
+ try {
+ sendSlaAlertEmail(s, exflow);
+ }
+ catch (Exception e) {
+ logger.error("Failed to send out SLA alert email. " + e.getCause());
+ }
+ } else if(act.equals(SlaAction.KILL)) {
+ try {
+ executorManager.cancelFlow(exflow, "azkaban");
+ //sendSlaKillEmail(s, exflow);
+ } catch (ExecutorManagerException e) {
+ // TODO Auto-generated catch block
+ logger.error("Cancel flow failed." + e.getCause());
+ }
+ }
+ }
+ }
+
+ private void sendSlaAlertEmail(SLA s, ExecutableFlow exflow) {
+ String message = null;
+ ExecutableNode exnode;
+ switch(s.getRule()) {
+ case FINISH:
+ if(s.getJobName().equals("")) {
+ message = "Flow " + exflow.getFlowId() + " failed to finish with set SLA." + String.format("%n");
+ message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format("%n");
+ message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
+ }
+ else {
+ exnode = exflow.getExecutableNode(s.getJobName());
+ message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " failed to finish with set SLA." + String.format("%n");
+ message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
+ message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
+ }
+ break;
+ case SUCCESS:
+ if(s.getJobName().equals("")) {
+ message = "Flow " + exflow.getFlowId() + " didn't finish successfully with set SLA. " + String.format("%n");
+ message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format(" %n");
+ message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
+ }
+ else {
+ exnode = exflow.getExecutableNode(s.getJobName());
+ message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " didn't finish successfully with set SLA." + String.format("%n");
+ message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
+ message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
+ }
+ break;
+ default:
+ logger.error("Unknown SLA rules!");
+ message = "Unknown SLA was not met!";
+ break;
+ }
+ mailer.sendSlaEmail(s, message);
+ }
+
+ public class SLAPreRunner extends Thread {
+ private final List<SLA> preSlas;
+ private AtomicBoolean stillAlive = new AtomicBoolean(true);
+
+ // Five minute minimum intervals
+ private static final int TIMEOUT_MS = 300000;
+
+ public SLAPreRunner() {
+ preSlas = new ArrayList<SLA>();
+ }
+
+ public void shutdown() {
+ logger.error("Shutting down pre-sla checker thread");
+ stillAlive.set(false);
+ this.interrupt();
+ }
+
+ /**
+ * Return a list of flow with SLAs
+ *
+ * @return
+ */
+ protected synchronized List<SLA> getPreSlas() {
+ return new ArrayList<SLA>(preSlas);
+ }
+
+ /**
+ * Adds SLA into runner and then interrupts so it will update
+ * its wait time.
+ *
+ * @param flow
+ */
+ public synchronized void addCheckerPreSla(SLA s) {
+ logger.info("Adding " + s + " to pre-sla checker.");
+ preSlas.add(s);
+ this.interrupt();
+ }
+
+ /**
+ * Remove runner SLA. Does not interrupt.
+ *
+ * @param flow
+ * @throws SLAManagerException
+ */
+ public synchronized void removeCheckerPreSla(SLA s) {
+ logger.info("Removing " + s + " from the pre-sla checker.");
+ preSlas.remove(s);
+ }
+
+ public void run() {
+ while (stillAlive.get()) {
+ synchronized (this) {
+ try {
+ // TODO clear up the exception handling
+
+ if (preSlas.size() == 0) {
+ try {
+ this.wait(TIMEOUT_MS);
+ } catch (InterruptedException e) {
+ // interruption should occur when items are added or removed from the queue.
+ }
+ } else {
+ for(SLA s : preSlas) {
+ ExecutableFlow exflow = executorManager.getExecutableFlow(s.getExecId());
+ String id = s.getJobName();
+ if(!s.equals("")) {
+ ExecutableNode exnode = exflow.getExecutableNode(id);
+ if(exnode.getStartTime() != -1) {
+
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Unexpected exception has been thrown in scheduler", e);
+ } catch (Throwable e) {
+ logger.error("Unexpected throwable has been thrown in scheduler", e);
+ }
+ }
+ }
+ }
+ }
+
+
+}
diff --git a/src/java/azkaban/sla/SLAManagerException.java b/src/java/azkaban/sla/SLAManagerException.java
new file mode 100644
index 0000000..a3468a2
--- /dev/null
+++ b/src/java/azkaban/sla/SLAManagerException.java
@@ -0,0 +1,13 @@
+package azkaban.sla;
+
+public class SLAManagerException extends Exception{
+ private static final long serialVersionUID = 1L;
+
+ public SLAManagerException(String message) {
+ super(message);
+ }
+
+ public SLAManagerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
src/java/azkaban/utils/EmailMessage.java 20(+13 -7)
diff --git a/src/java/azkaban/utils/EmailMessage.java b/src/java/azkaban/utils/EmailMessage.java
index 1ac7607..b510d6c 100644
--- a/src/java/azkaban/utils/EmailMessage.java
+++ b/src/java/azkaban/utils/EmailMessage.java
@@ -21,7 +21,10 @@ import javax.mail.internet.MimeBodyPart;
import javax.mail.internet.MimeMessage;
import javax.mail.internet.MimeMultipart;
+import com.sun.mail.smtp.SMTPTransport;
+
public class EmailMessage {
+ private static String protocol = "smtp";
private List<String> _toAddress = new ArrayList<String>();
private String _mailHost;
private String _mailUser;
@@ -129,12 +132,13 @@ public class EmailMessage {
public void sendEmail() throws MessagingException {
checkSettings();
Properties props = new Properties();
- props.setProperty("mail.transport.protocol", "smtp");
- props.put("mail.host", _mailHost);
+// props.setProperty("mail.transport.protocol", "smtp");
+ props.put("mail."+protocol+".host", _mailHost);
+ props.put("mail."+protocol+".auth", "true");
props.put("mail.user", _mailUser);
props.put("mail.password", _mailPassword);
- Session session = Session.getDefaultInstance(props);
+ Session session = Session.getInstance(props, null);
Message message = new MimeMessage(session);
InternetAddress from = new InternetAddress(_fromAddress, false);
message.setFrom(from);
@@ -160,11 +164,13 @@ public class EmailMessage {
message.setContent(_body.toString(), _mimeType);
}
- Transport transport = session.getTransport();
- transport.connect();
- transport.sendMessage(message,
+// Transport transport = session.getTransport();
+
+ SMTPTransport t = (SMTPTransport) session.getTransport(protocol);
+ t.connect(_mailHost, _mailUser, _mailPassword);
+ t.sendMessage(message,
message.getRecipients(Message.RecipientType.TO));
- transport.close();
+ t.close();
}
public void setBody(String body) {
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 430af3d..e3cceae 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -51,6 +51,9 @@ import azkaban.project.ProjectManager;
import azkaban.scheduler.JdbcScheduleLoader;
import azkaban.scheduler.ScheduleManager;
+import azkaban.sla.JdbcSLALoader;
+import azkaban.sla.SLAManager;
+import azkaban.sla.SLAManagerException;
import azkaban.user.UserManager;
import azkaban.user.XmlUserManager;
import azkaban.utils.FileIOUtils;
@@ -118,6 +121,7 @@ public class AzkabanWebServer implements AzkabanServer {
private ProjectManager projectManager;
private ExecutorManager executorManager;
private ScheduleManager scheduleManager;
+ private SLAManager slaManager;
private final ClassLoader baseClassLoader;
@@ -144,7 +148,8 @@ public class AzkabanWebServer implements AzkabanServer {
userManager = loadUserManager(props);
projectManager = loadProjectManager(props);
executorManager = loadExecutorManager(props);
- scheduleManager = loadScheduleManager(executorManager, props);
+ slaManager = loadSLAManager(props);
+ scheduleManager = loadScheduleManager(executorManager, slaManager, props);
baseClassLoader = getBaseClassloader();
tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
@@ -159,6 +164,8 @@ public class AzkabanWebServer implements AzkabanServer {
}
}
+
+
private void setViewerPlugins(List<ViewerPlugin> viewerPlugins) {
this.viewerPlugins = viewerPlugins;
}
@@ -202,12 +209,17 @@ public class AzkabanWebServer implements AzkabanServer {
return execManager;
}
- private ScheduleManager loadScheduleManager(ExecutorManager execManager, Props props ) throws Exception {
- ScheduleManager schedManager = new ScheduleManager(execManager, projectManager, new JdbcScheduleLoader(props));
+ private ScheduleManager loadScheduleManager(ExecutorManager execManager, SLAManager slaManager, Props props ) throws Exception {
+ ScheduleManager schedManager = new ScheduleManager(execManager, projectManager, slaManager, new JdbcScheduleLoader(props));
return schedManager;
}
+ private SLAManager loadSLAManager(Props props) throws SLAManagerException {
+ SLAManager slaManager = new SLAManager(executorManager, new JdbcSLALoader(props), props);
+ return slaManager;
+ }
+
/**
* Returns the web session cache.
*
@@ -249,6 +261,10 @@ public class AzkabanWebServer implements AzkabanServer {
return executorManager;
}
+ public SLAManager getSLAManager() {
+ return slaManager;
+ }
+
public ScheduleManager getScheduleManager() {
return scheduleManager;
}
@@ -654,4 +670,6 @@ public class AzkabanWebServer implements AzkabanServer {
return props;
}
+
+
}
src/java/azkaban/webapp/servlet/ScheduleServlet.java 454(+402 -52)
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index fd17899..b5da554 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -17,6 +17,8 @@
package azkaban.webapp.servlet;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -25,15 +27,24 @@ import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import javax.swing.text.StyledEditorKit.BoldAction;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
+import org.joda.time.Hours;
import org.joda.time.LocalDateTime;
+import org.joda.time.Minutes;
import org.joda.time.ReadablePeriod;
import org.joda.time.format.DateTimeFormat;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.executor.ExecutableFlow.Status;
import azkaban.flow.Flow;
+import azkaban.flow.Node;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
import azkaban.project.ProjectLogEvent.EventType;
@@ -46,13 +57,21 @@ import azkaban.utils.Pair;
import azkaban.webapp.AzkabanWebServer;
import azkaban.webapp.session.Session;
import azkaban.scheduler.Schedule;
+import azkaban.scheduler.Schedule.FlowOptions;
+import azkaban.scheduler.Schedule.SlaOptions;
import azkaban.scheduler.ScheduleManager;
+import azkaban.sla.SLA;
+import azkaban.sla.SLA.SlaRule;
+import azkaban.sla.SLAManager;
+import azkaban.sla.SLA.SlaAction;
+import azkaban.sla.SLA.SlaSetting;
public class ScheduleServlet extends LoginAbstractAzkabanServlet {
private static final long serialVersionUID = 1L;
private static final Logger logger = Logger.getLogger(ScheduleServlet.class);
private ProjectManager projectManager;
private ScheduleManager scheduleManager;
+ private SLAManager slaManager;
private UserManager userManager;
@Override
@@ -62,51 +81,252 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
projectManager = server.getProjectManager();
scheduleManager = server.getScheduleManager();
userManager = server.getUserManager();
+ slaManager = server.getSLAManager();
}
@Override
protected void handleGet(HttpServletRequest req, HttpServletResponse resp,
Session session) throws ServletException, IOException {
+ if (hasParam(req, "ajax")) {
+ handleAJAXAction(req, resp, session);
+ }
+ else {
+ handleGetAllSchedules(req, resp, session);
+ }
+ }
+
+ private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+ HashMap<String, Object> ret = new HashMap<String, Object>();
+ String ajaxName = getParam(req, "ajax");
+
+ if (ajaxName.equals("slaInfo")) {
+ ajaxSlaInfo(req, ret, session.getUser());
+ }
+ else if(ajaxName.equals("setSla")) {
+ ajaxSetSla(req, ret, session.getUser());
+ }
+ else if(ajaxName.equals("advSchedule")) {
+ ajaxAdvSchedule(req, ret, session.getUser());
+ }
+
+ if (ret != null) {
+ this.writeJSON(resp, ret);
+ }
+ }
+
+ private void ajaxSetSla(HttpServletRequest req, HashMap<String, Object> ret, User user) {
+ try {
+
+ int projectId = getIntParam(req, "projectId");
+ String flowName = getParam(req, "flowName");
+
+ Project project = projectManager.getProject(projectId);
+ if(!hasPermission(project, user, Permission.Type.SCHEDULE)) {
+ ret.put("error", "User " + user + " does not have permission to set SLA for this flow.");
+ return;
+ }
+
+ Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projectId, flowName));
+
+ SlaOptions slaOptions= new SlaOptions();
+
+ String slaEmails = getParam(req, "slaEmails");
+ System.out.println(slaEmails);
+ String[] emailSplit = slaEmails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+
+ Map<String, String> settings = getParamGroup(req, "settings");
+ System.out.println(settings);
+ List<SlaSetting> slaSettings = new ArrayList<SlaSetting>();
+ for(String set : settings.keySet()) {
+ SlaSetting s;
+ try {
+ s = parseSlaSetting(settings.get(set));
+ }
+ catch (Exception e) {
+ throw new ServletException(e);
+ }
+ if(s != null) {
+ slaSettings.add(s);
+ }
+ }
+
+ if(slaSettings.size() > 0) {
+ slaOptions.setSlaEmails(Arrays.asList(emailSplit));
+ slaOptions.setSettings(slaSettings);
+ }
+ else {
+ slaOptions = null;
+ }
+ sched.setSlaOptions(slaOptions);
+ scheduleManager.insertSchedule(sched);
+
+ } catch (ServletException e) {
+ ret.put("error", e);
+ }
+
+ }
+
+
+ private SlaSetting parseSlaSetting(String set) {
+ // "" + Duration + EmailAction + KillAction
+ String[] parts = set.split(",", -1);
+ String id = parts[0];
+ String rule = parts[1];
+ String duration = parts[2];
+ String emailAction = parts[3];
+ String killAction = parts[4];
+ if(emailAction.equals("on") || killAction.equals("on")) {
+ SlaSetting r = new SlaSetting();
+ r.setId(id);
+ r.setRule(SlaRule.valueOf(rule));
+ ReadablePeriod dur = parseDuration(duration);
+ r.setDuration(dur);
+ List<SlaAction> actions = new ArrayList<SLA.SlaAction>();
+ if(emailAction.equals("on")) {
+ actions.add(SlaAction.EMAIL);
+ }
+ if(killAction.equals("on")) {
+ actions.add(SlaAction.KILL);
+ }
+ r.setActions(actions);
+ return r;
+ }
+ return null;
+ }
+
+ private ReadablePeriod parseDuration(String duration) {
+ int hour = Integer.parseInt(duration.split(":")[0]);
+ int min = Integer.parseInt(duration.split(":")[1]);
+ return Minutes.minutes(min+hour*60).toPeriod();
+ }
+
+ @SuppressWarnings("unchecked")
+ private void ajaxSlaInfo(HttpServletRequest req, HashMap<String, Object> ret, User user) {
+ int projId;
+ String flowName;
+ try {
+ projId = getIntParam(req, "projId");
+ flowName = getParam(req, "flowName");
+
+ Project project = getProjectAjaxByPermission(ret, projId, user, Type.READ);
+ if (project == null) {
+ ret.put("error", "Error loading project. Project " + projId + " doesn't exist");
+ return;
+ }
+
+ Flow flow = project.getFlow(flowName);
+ if (flow == null) {
+ ret.put("error", "Error loading flow. Flow " + flowName + " doesn't exist in " + projId);
+ return;
+ }
+
+ Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projId, flowName));
+
+ SlaOptions slaOptions = sched.getSlaOptions();
+ FlowOptions flowOptions = sched.getFlowOptions();
+
+ if(slaOptions != null) {
+ ret.put("slaEmails", slaOptions.getSlaEmails());
+ List<SlaSetting> settings = slaOptions.getSettings();
+ List<Object> setObj = new ArrayList<Object>();
+ for(SlaSetting set: settings) {
+ setObj.add(set.toObject());
+ }
+ ret.put("settings", setObj);
+ }
+ else if (flowOptions != null) {
+ if(flowOptions.getFailureEmails() != null) {
+ List<String> emails = flowOptions.getFailureEmails();
+ if(emails.size() > 0) {
+ ret.put("slaEmails", emails);
+ }
+ }
+ }
+ else {
+ if(flow.getFailureEmails() != null) {
+ List<String> emails = flow.getFailureEmails();
+ if(emails.size() > 0) {
+ ret.put("slaEmails", emails);
+ }
+ }
+ }
+
+ List<String> disabledJobs;
+ if(flowOptions != null) {
+ disabledJobs = flowOptions.getDisabledJobs() == null ? new ArrayList<String>() : flowOptions.getDisabledJobs();
+ }
+ else {
+ disabledJobs = new ArrayList<String>();
+ }
+
+ List<String> allJobs = new ArrayList<String>();
+ for(Node n : flow.getNodes()) {
+ if(!disabledJobs.contains(n.getId())) {
+ allJobs.add(n.getId());
+ }
+ }
+ ret.put("allJobNames", allJobs);
+ } catch (ServletException e) {
+ ret.put("error", e);
+ }
+
+ }
+
+ protected Project getProjectAjaxByPermission(Map<String, Object> ret, int projectId, User user, Permission.Type type) {
+ Project project = projectManager.getProject(projectId);
+
+ if (project == null) {
+ ret.put("error", "Project '" + project + "' not found.");
+ }
+ else if (!hasPermission(project, user, type)) {
+ ret.put("error", "User '" + user.getUserId() + "' doesn't have " + type.name() + " permissions on " + project.getName());
+ }
+ else {
+ return project;
+ }
+
+ return null;
+ }
+
+ private void handleGetAllSchedules(HttpServletRequest req, HttpServletResponse resp,
+ Session session) throws ServletException, IOException{
+
Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/scheduledflowpage.vm");
List<Schedule> schedules = scheduleManager.getSchedules();
page.add("schedules", schedules);
+//
+// List<SLA> slas = slaManager.getSLAs();
+// page.add("slas", slas);
page.render();
}
@Override
protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
- HashMap<String, Object> ret = new HashMap<String, Object>();
- if (hasParam(req, "action")) {
- String action = getParam(req, "action");
- if (action.equals("scheduleFlow")) {
- ajaxScheduleFlow(req, ret, session.getUser());
- }
- else if(action.equals("removeSched")){
- ajaxRemoveSched(req, ret, session.getUser());
+ if (hasParam(req, "ajax")) {
+ handleAJAXAction(req, resp, session);
+ }
+ else {
+ HashMap<String, Object> ret = new HashMap<String, Object>();
+ if (hasParam(req, "action")) {
+ String action = getParam(req, "action");
+ if (action.equals("scheduleFlow")) {
+ ajaxScheduleFlow(req, ret, session.getUser());
+ }
+ else if(action.equals("removeSched")){
+ ajaxRemoveSched(req, ret, session.getUser());
+ }
}
+
+ if(ret.get("status") == ("success"))
+ setSuccessMessageInCookie(resp, (String) ret.get("message"));
+ else
+ setErrorMessageInCookie(resp, (String) ret.get("message"));
+
+ this.writeJSON(resp, ret);
}
-
- if(ret.get("status") == ("success"))
- setSuccessMessageInCookie(resp, (String) ret.get("message"));
- else
- setErrorMessageInCookie(resp, (String) ret.get("message"));
-
- this.writeJSON(resp, ret);
}
-
-// private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
-// HashMap<String, Object> ret = new HashMap<String, Object>();
-// String ajaxName = getParam(req, "ajax");
-//
-// if (ajaxName.equals("scheduleFlow")) {
-// ajaxScheduleFlow(req, ret, session.getUser());
-// }
-//// }
-// this.writeJSON(resp, ret);
-// }
-//
private void ajaxRemoveSched(HttpServletRequest req, Map<String, Object> ret, User user) throws ServletException{
int projectId = getIntParam(req, "projectId");
@@ -197,40 +417,170 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
hour += 12;
hour %= 24;
-// String submitUser = user.getUserId();
-// String userExec = userSubmit;//getParam(req, "userExec");
-// String scheduleId = projectId + "." + flowName;
DateTime submitTime = new DateTime();
DateTime firstSchedTime = day.withHourOfDay(hour).withMinuteOfHour(minutes).withSecondOfMinute(0);
- //ScheduledFlow schedFlow = scheduleManager.schedule(scheduleId, projectId, flowId, userExec, userSubmit, submitTime, firstSchedTime, thePeriod);
- //project.info("User '" + user.getUserId() + "' has scheduled " + flow.getId() + "[" + schedFlow.toNiceString() + "].");
- Schedule schedule = scheduleManager.scheduleFlow(projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), timezone, thePeriod, submitTime.getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId());
+ Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projectId, flowName));
+ FlowOptions flowOptions = null;
+ SlaOptions slaOptions = null;
+ if(sched != null) {
+ if(sched.getFlowOptions() != null) {
+ flowOptions = sched.getFlowOptions();
+ }
+ if(sched.getSlaOptions() != null) {
+ slaOptions = sched.getSlaOptions();
+ }
+ }
+ Schedule schedule = scheduleManager.scheduleFlow(projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), timezone, thePeriod, submitTime.getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);
+ logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName + " (" + projectId +")" + "].");
+ projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.getScheduleName() + " has been added.");
+
+ ret.put("status", "success");
+ ret.put("message", projectName + "." + flowName + " scheduled.");
+ }
+
+ private void ajaxAdvSchedule(HttpServletRequest req, HashMap<String, Object> ret, User user) throws ServletException {
+ String projectName = getParam(req, "projectName");
+ String flowName = getParam(req, "flowName");
+ int projectId = getIntParam(req, "projectId");
+
+ Project project = projectManager.getProject(projectId);
+
+ if (project == null) {
+ ret.put("message", "Project " + projectName + " does not exist");
+ ret.put("status", "error");
+ return;
+ }
+
+ if (!hasPermission(project, user, Type.SCHEDULE)) {
+ ret.put("status", "error");
+ ret.put("message", "Permission denied. Cannot execute " + flowName);
+ return;
+ }
+
+ Flow flow = project.getFlow(flowName);
+ if (flow == null) {
+ ret.put("status", "error");
+ ret.put("message", "Flow " + flowName + " cannot be found in project " + project);
+ return;
+ }
+
+ String scheduleTime = getParam(req, "scheduleTime");
+ String scheduleDate = getParam(req, "scheduleDate");
+ DateTime firstSchedTime;
+ try {
+ firstSchedTime = parseDateTime(scheduleDate, scheduleTime);
+ }
+ catch (Exception e) {
+ ret.put("error", "Invalid date and/or time '" + scheduleDate + " " + scheduleTime);
+ return;
+ }
+
+ ReadablePeriod thePeriod = null;
+ try {
+ if(hasParam(req, "is_recurring") && getParam(req, "is_recurring").equals("on")) {
+ thePeriod = Schedule.parsePeriodString(getParam(req, "period"));
+ }
+ }
+ catch(Exception e){
+ ret.put("error", e.getMessage());
+ }
+
+ Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projectId, flowName));
+ FlowOptions flowOptions = null;
+ try {
+ flowOptions = parseFlowOptions(req);
+ }
+ catch (Exception e) {
+ ret.put("error", e.getMessage());
+ }
+ SlaOptions slaOptions = null;
+ if(sched != null) {
+ if(sched.getSlaOptions() != null) {
+ slaOptions = sched.getSlaOptions();
+ }
+ }
+ Schedule schedule = scheduleManager.scheduleFlow(projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(), thePeriod, DateTime.now().getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);
logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName + " (" + projectId +")" + "].");
projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.getScheduleName() + " has been added.");
ret.put("status", "success");
ret.put("message", projectName + "." + flowName + " scheduled.");
}
-
-// private ReadablePeriod parsePeriod(HttpServletRequest req) throws ServletException {
-// int period = getIntParam(req, "period");
-// String periodUnits = getParam(req, "period_units");
-// if("M".equals(periodUnits))
-// return Months.months(period);
-// else if("w".equals(periodUnits))
-// return Weeks.weeks(period);
-// else if("d".equals(periodUnits))
-// return Days.days(period);
-// else if("h".equals(periodUnits))
-// return Hours.hours(period);
-// else if("m".equals(periodUnits))
-// return Minutes.minutes(period);
-// else if("s".equals(periodUnits))
-// return Seconds.seconds(period);
-// else
-// throw new ServletException("Unknown period unit: " + periodUnits);
-// }
+
+ private FlowOptions parseFlowOptions(HttpServletRequest req) throws ServletException {
+ FlowOptions flowOptions = new FlowOptions();
+ if (hasParam(req, "failureAction")) {
+ String option = getParam(req, "failureAction");
+ if (option.equals("finishCurrent") ) {
+ flowOptions.setFailureAction(FailureAction.FINISH_CURRENTLY_RUNNING);
+ }
+ else if (option.equals("cancelImmediately")) {
+ flowOptions.setFailureAction(FailureAction.CANCEL_ALL);
+ }
+ else if (option.equals("finishPossible")) {
+ flowOptions.setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
+ }
+ }
+
+ if (hasParam(req, "failureEmails")) {
+ String emails = getParam(req, "failureEmails");
+ String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+ flowOptions.setFailureEmails(Arrays.asList(emailSplit));
+ }
+ if (hasParam(req, "successEmails")) {
+ String emails = getParam(req, "successEmails");
+ String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+ flowOptions.setSuccessEmails(Arrays.asList(emailSplit));
+ }
+ if (hasParam(req, "notifyFailureFirst")) {
+ flowOptions.setNotifyOnFirstFailure(Boolean.parseBoolean(getParam(req, "notifyFailureFirst")));
+ }
+ if (hasParam(req, "notifyFailureLast")) {
+ flowOptions.setNotifyOnLastFailure(Boolean.parseBoolean(getParam(req, "notifyFailureLast")));
+ }
+ if (hasParam(req, "executingJobOption")) {
+ //String option = getParam(req, "jobOption");
+ // Not set yet
+ }
+
+ Map<String, String> flowParamGroup = this.getParamGroup(req, "flowOverride");
+ flowOptions.setFlowOverride(flowParamGroup);
+
+ if (hasParam(req, "disabledJobs")) {
+ String disable = getParam(req, "disabledJobs");
+ String[] disableSplit = disable.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+ List<String> jobs = (List<String>) Arrays.asList(disableSplit);
+ flowOptions.setDisabledJobs(jobs.subList(1, jobs.size()));
+ }
+ return flowOptions;
+ }
+
+ private DateTime parseDateTime(String scheduleDate, String scheduleTime) {
+ // scheduleTime: 12,00,pm,PDT
+ String[] parts = scheduleTime.split(",", -1);
+ int hour = Integer.parseInt(parts[0]);
+ int minutes = Integer.parseInt(parts[1]);
+ boolean isPm = parts[2].equalsIgnoreCase("pm");
+
+ DateTimeZone timezone = parts[3].equals("UTC") ? DateTimeZone.UTC : DateTimeZone.forID("America/Los_Angeles");
+
+ // scheduleDate: 02/10/2013
+ DateTime day = null;
+ if(scheduleDate == null || scheduleDate.trim().length() == 0) {
+ day = new LocalDateTime().toDateTime();
+ } else {
+ day = DateTimeFormat.forPattern("MM/dd/yyyy").withZone(timezone).parseDateTime(scheduleDate);
+ }
+
+ if(isPm && hour < 12)
+ hour += 12;
+ hour %= 24;
+
+ DateTime firstSchedTime = day.withHourOfDay(hour).withMinuteOfHour(minutes).withSecondOfMinute(0);
+
+ return firstSchedTime;
+ }
private boolean hasPermission(Project project, User user, Permission.Type type) {
if (project.hasPermission(user, type)) {
diff --git a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
index e28dab6..102c43f 100644
--- a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
@@ -34,6 +34,7 @@
<script type="text/javascript" src="${context}/js/azkaban.flow.graph.view.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.flow.view.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.exflow.options.view.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.schedule.options.view.js"></script>
<script type="text/javascript" src="${context}/js/svgNavigate.js"></script>
<script type="text/javascript">
var contextURL = "${context}";
@@ -131,6 +132,7 @@
</div>
</div>
<!-- modal content -->
+
<div id="schedule-flow" class="modal">
<h3>Schedule Flow</h3>
<div id="errorMsg" class="box-error-message">$errorMsg</div>
@@ -173,21 +175,23 @@
</form>
</div>
-
<div class="actions">
- <a class="yes btn2" id="schedule-btn" href="#">Schedule The Flow</a>
- <a class="no simplemodal-close btn3" href="#">Cancel</a>
+ <a class="yes btn2" id="schedule-btn" href="#">Schedule The Flow</a>
+ <a class="no simplemodal-close btn3" href="#">Cancel</a>
+ <a class="btn2" id="adv-schedule-opt-btn" href="#">Advanced Schedule Options</a>
</div>
</div>
<div id="invalid-session" class="modal">
- <h3>Invalid Session</h3>
+ <h3>Invalid Session</h3>
<p>Session has expired. Please re-login.</p>
<div class="actions">
- <a class="yes btn3" id="login-btn" href="#">Re-login</a>
+ <a class="yes btn3" id="login-btn" href="#">Re-login</a>
</div>
</div>
-
+#parse( "azkaban/webapp/servlet/velocity/scheduleoptionspanel.vm" )
+
#parse( "azkaban/webapp/servlet/velocity/executionoptionspanel.vm" )
+
#end
<ul id="jobMenu" class="contextMenu">
<li class="open"><a href="#open">Open...</a></li>
@@ -214,6 +218,26 @@
</ul>
</ul>
+ <ul id="scheduleDisableJobMenu" class="contextMenu flowSubmenu">
+ <li class="openwindow"><a href="#scheduleOpenwindow">Open in New Window...</a></li>
+ <li id="scheduleDisable" class="disable separator"><a href="#disable">Disable</a><div id="scheduleDisableArrow" class="context-sub-icon"></div></li>
+ <ul id="scheduleDisableSub" class="subMenu">
+ <li class="disableAll"><a href="#disableAll">All</a></li>
+ <li class="parents"><a href="#disableParents">Parents</a></li>
+ <li class="ancestors"><a href="#disableAncestors">All Ancestors</a></li>
+ <li class="children"><a href="#disableChildren">Children</a></li>
+ <li class="decendents"><a href="#disableDescendents">All Descendents</a></li>
+ </ul>
+ <li id="scheduleEnable" class="enable"><a href="#enable">Enable</a> <div id="scheduleEnableArrow" class="context-sub-icon"></div></li>
+ <ul id="scheduleEnableSub" class="subMenu">
+ <li class="enableAll"><a href="#enableAll">All</a></li>
+ <li class="parents"><a href="#enableParents">Parents</a></li>
+ <li class="ancestors"><a href="#enableAncestors">All Ancestors</a></li>
+ <li class="children"><a href="#enableChildren">Children</a></li>
+ <li class="decendents"><a href="#enableDescendents">All Descendents</a></li>
+ </ul>
+ </ul>
+
</div>
</body>
</html>
diff --git a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
index 09d387a..64d5a9d 100644
--- a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
@@ -18,8 +18,12 @@
<html>
<head>
#parse( "azkaban/webapp/servlet/velocity/style.vm" )
- <script type="text/javascript" src="${context}/js/jquery/jquery.js"></script>
- <script type="text/javascript" src="${context}/js/jqueryui/jquery-ui.custom.min.js"></script>
+ <link rel="stylesheet" type="text/css" href="${context}/css/jquery-ui-timepicker-addon.css" />
+ <link rel="stylesheet" type="text/css" href="${context}/css/jquery-ui.css" />
+ <script type="text/javascript" src="${context}/js/jquery/jquery-1.8.3.min.js"></script>
+ <script type="text/javascript" src="${context}/js/jqueryui/jquery-ui-1.9.2.custom.min.js"></script>
+ <script type="text/javascript" src="${context}/js/jqueryui/jquery-ui-timepicker-addon.js"></script>
+ <script type="text/javascript" src="${context}/js/jqueryui/jquery-ui-sliderAccess.js"></script>
<script type="text/javascript" src="${context}/js/namespace.js"></script>
<script type="text/javascript" src="${context}/js/underscore-1.2.1-min.js"></script>
<script type="text/javascript" src="${context}/js/backbone-0.5.3-min.js"></script>
@@ -32,7 +36,6 @@
var timezone = "${timezone}";
var errorMessage = null;
var successMessage = null;
-
</script>
</head>
<body>
@@ -43,12 +46,12 @@
<div class="content">
#if($errorMsg)
- <div class="box-error-message">$errorMsg</div>
+ <div class="box-error-message">$errorMsg</div>
#else
#if($error_message != "null")
- <div class="box-error-message">$error_message</div>
+ <div class="box-error-message">$error_message</div>
#elseif($success_message != "null")
- <div class="box-success-message">$success_message</div>
+ <div class="box-success-message">$success_message</div>
#end
#end
@@ -65,12 +68,12 @@
<!--th class="execid">Execution Id</th-->
<th>Flow</th>
<th>Project</th>
- <th>User</th>
<th>Submitted By</th>
<th class="date">First Scheduled to Run</th>
<th class="date">Next Execution Time</th>
<th class="date">Repeats Every</th>
- <th class="action">Action</th>
+ <th>Has SLA</th>
+ <th colspan="2" class="action">Action</th>
</tr>
</thead>
<tbody>
@@ -85,11 +88,12 @@
<a href="${context}/manager?project=${sched.projectName}">${sched.projectName}</a>
</td>
<td>${sched.submitUser}</td>
- <td>${sched.submitUser}</td>
<td>$utils.formatDateTime(${sched.firstSchedTime})</td>
<td>$utils.formatDateTime(${sched.nextExecTime})</td>
<td>$utils.formatPeriod(${sched.period})</td>
+ <td>#if(${sched.slaOptions}) true #else false #end</td>
<td><button id="removeSchedBtn" onclick="removeSched(${sched.projectId}, '${sched.flowName}')" >Remove Schedule</button></td>
+ <td><button id="addSlaBtn" onclick="slaView.initFromSched(${sched.projectId}, '${sched.flowName}')" >Set SLA</button></td>
</tr>
#end
#else
@@ -97,8 +101,74 @@
#end
</tbody>
</table>
-
-
+ </div>
+
+ <!-- modal content -->
+
+ <div id="slaModalBackground" class="modalBackground2">
+ <div id="sla-options" class="modal modalContainer2">
+ <a href='#' title='Close' class='modal-close'>x</a>
+ <h3>SLA Options</h3>
+ <div>
+ <ul class="optionsPicker">
+ <li id="slaOptions">General SLA Options</li>
+ </ul>
+ </div>
+ <div class="optionsPane">
+ <div id="generalPanel" class="generalPanel panel">
+ <div id="slaActions">
+ <h4>SLA Alert Emails</h4>
+ <dl>
+ <dt >SLA Alert Emails</dt>
+ <dd>
+ <textarea id="slaEmails"></textarea>
+ </dd>
+ </dl>
+ </div>
+ <br></br>
+ <div id="slaRules">
+ <h4>Flow SLA Rules</h4>
+ <div class="tableDiv">
+ <table id="flowRulesTbl">
+ <thead>
+ <tr>
+ <th>Flow/Job</th>
+ <th>Sla Rule</th>
+ <th>Duration</th>
+ <th>Email Action</th>
+ <th>Kill Action</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr id="addRow"><td id="addRow-col" colspan="5"><span class="addIcon"></span><a href="#">Add Row</a></td></tr>
+ </tbody>
+ </table>
+ </div>
+ <!--h4 style="visibility: hidden">Job SLA Rules</h4>
+ <div class="tableDiv" style="visibility: hidden">
+ <table id="jobRulesTbl">
+ <thead>
+ <tr>
+ <th>Flow/Job Id</th>
+ <th>Finish In</th>
+ <th>Duration</th>
+ <th>Email Action</th>
+ <th>Kill Action</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr id="addRow"><td id="addRow-col" colspan="4"><span class="addIcon"></span><a href="#">Add Row</a></td></tr>
+ </tbody>
+ </table>
+ </div-->
+ </div>
+ </div>
+ </div>
+ <div class="actions">
+ <!--a class="yes btn1" id="remove-sla-btn" href="#">Remove SLA</a-->
+ <a class="yes btn1" id="set-sla-btn" href="#">Set/Change SLA</a>
+ <a class="no simplemodal-close btn3" id="sla-cancel-btn" href="#">Cancel</a>
+ </div>
</div>
</div>
</body>
diff --git a/src/java/azkaban/webapp/servlet/velocity/scheduleoptionspanel.vm b/src/java/azkaban/webapp/servlet/velocity/scheduleoptionspanel.vm
new file mode 100644
index 0000000..fc6bc91
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/velocity/scheduleoptionspanel.vm
@@ -0,0 +1,159 @@
+<div id="scheduleModalBackground" class="modalBackground2">
+ <div id="schedule-options" class="modal modalContainer2">
+ <a href='#' title='Close' class='modal-close'>x</a>
+ <h3>Schedule Flow Options</h3>
+ <div>
+ <ul class="optionsPicker">
+ <li id="scheduleGeneralOptions">General Options</li>
+ <li id="scheduleFlowOptions">Flow Options</li>
+ <!--li id="scheduleSlaOptions">SLA Options</li-->
+ </ul>
+ </div>
+ <div class="optionsPane">
+ <!--div id="scheduleSlaPanel" class="generalPanel panel">
+ <div id="slaActions">
+ <h4>SLA Alert Emails</h4>
+ <dl>
+ <dt >SLA Alert Emails</dt>
+ <dd>
+ <textarea id="slaEmails"></textarea>
+ </dd>
+ </dl>
+ </div>
+ <div id="slaRules">
+ <h4>Flow SLA Rules</h4>
+ <div class="tableDiv">
+ <table id="flowRulesTbl">
+ <thead>
+ <tr>
+ <th>Flow/Job</th>
+ <th>Finish In</th>
+ <th>Email Action</th>
+ <th>Kill Action</th>
+ </tr>
+ </thead>
+ <tbody>
+ </tbody>
+ </table>
+ </div>
+ <h4>Job SLA Rules</h4>
+ <div class="tableDiv">
+ <table id="jobRulesTbl">
+ <thead>
+ <tr>
+ <th>Flow/Job</th>
+ <th>Finish In</th>
+ <th>Email Action</th>
+ <th>Kill Action</th>
+ </tr>
+ </thead>
+ <tbody>
+ </tbody>
+ </table>
+ </div>
+ </div>
+ </div-->
+ <div id="scheduleGeneralPanel" class="generalPanel panel">
+ <div id="scheduleInfo">
+ <dl>
+ <dt>Schedule Time</dt>
+ <dd>
+ <input id="advhour" type="text" size="2" value="12"/>
+ <input id="advminutes" type="text" size="2" value="00"/>
+ <select id="advam_pm">
+ <option>pm</option>
+ <option>am</option>
+ </select>
+ <select id="advtimezone">
+ <option>PDT</option>
+ <option>UTC</option>
+ </select>
+ </dd>
+ <dt>Schedule Date</dt><dd><input type="text" id="advdatepicker" /></dd>
+ <dt>Recurrence</dt>
+ <dd>
+ <input id="advis_recurring" type="checkbox" checked />
+ <span>repeat every</span>
+ <input id="advperiod" type="text" size="2" value="1"/>
+ <select id="advperiod_units">
+ <option value="d">Days</option>
+ <option value="h">Hours</option>
+ <option value="m">Minutes</option>
+ <option value="M">Months</option>
+ <option value="w">Weeks</option>
+ </select>
+ </dd>
+ </dl>
+ </div>
+ <br></br>
+ <br></br>
+ <div id="scheduleCompleteActions">
+ <h4>Completion Actions</h4>
+ <dl>
+ <dt>Failure Action</dt>
+ <dd>
+ <select id="scheduleFailureAction" name="failureAction">
+ <option value="finishCurrent">Finish Current Running</option>
+ <option value="cancelImmediately">Cancel All</option>
+ <option value="finishPossible">Finish All Possible</option>
+ </select>
+ </dd>
+ <dt>Failure Email</dt>
+ <dd>
+ <textarea id="scheduleFailureEmails"></textarea>
+ </dd>
+ <dt>Notify on Failure</dt>
+ <dd>
+ <input id="scheduleNotifyFailureFirst" class="checkbox" type="checkbox" name="notify" value="first" checked >First Failure</input>
+ <input id="scheduleNotifyFailureLast" class="checkbox" type="checkbox" name="notify" value="last">Flow Stop</input>
+ </dd>
+ <dt>Success Email</dt>
+ <dd>
+ <textarea id="scheduleSuccessEmails"></textarea>
+ </dd>
+ <dt>Concurrent Execution</dt>
+ <dd id="scheduleExecutingJob" class="disabled">
+ <input id="scheduleIgnore" class="radio" type="radio" name="concurrent" value="ignore" checked /><label class="radioLabel" for="ignore">Run Concurrently</label>
+ <input id="schedulePipeline" class="radio" type="radio" name="concurrent" value="pipeline" /><label class="radioLabel" for="pipeline">Pipeline</label>
+ <input id="scheduleQueue" class="radio" type="radio" name="concurrent" value="queue" /><label class="radioLabel" for="queue">Queue Job</label>
+ </dd>
+ </dl>
+ </div>
+ <div id="scheduleFlowPropertyOverride">
+ <h4>Flow Property Override</h4>
+ <div class="tableDiv">
+ <table>
+ <thead>
+ <tr>
+ <th>Name</th>
+ <th>Value</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr id="scheduleAddRow"><td id="addRow-col" colspan="2"><span class="addIcon"></span><a href="#">Add Row</a></td></tr>
+ </tbody>
+ </table>
+ </div>
+ </div>
+ </div>
+ <div id="scheduleGraphPanel" class="graphPanel panel">
+ <div id="scheduleJobListCustom" class="jobList">
+ <div class="filterList">
+ <input class="filter" placeholder=" Job Filter" />
+ </div>
+ <div class="list">
+ </div>
+ <div class="btn5 resetPanZoomBtn" >Reset Pan Zoom</div>
+ </div>
+ <div id="scheduleSvgDivCustom" class="svgDiv" >
+ <svg class="svgGraph" xmlns="http://www.w3.org/2000/svg" version="1.1" shape-rendering="optimize-speed" text-rendering="optimize-speed" >
+ </svg>
+ </div>
+ </div>
+ </div>
+ <div class="actions">
+ <a class="yes btn1" id="adv-schedule-btn" href="#">Schedule</a>
+ <a class="no simplemodal-close btn3" id="schedule-cancel-btn" href="#">Cancel</a>
+ </div>
+ </div>
+</div>
\ No newline at end of file
src/sql/create_schedule_table.sql 2(+2 -0)
diff --git a/src/sql/create_schedule_table.sql b/src/sql/create_schedule_table.sql
index defadce..381ce6d 100644
--- a/src/sql/create_schedule_table.sql
+++ b/src/sql/create_schedule_table.sql
@@ -12,6 +12,8 @@ CREATE TABLE schedules (
next_exec_time BIGINT,
submit_time BIGINT,
submit_user VARCHAR(128),
+ enc_type TINYINT,
+ options LONGBLOB,
primary key(project_id, flow_name)
) ENGINE=InnoDB;
src/sql/create_sla_table.sql 10(+10 -0)
diff --git a/src/sql/create_sla_table.sql b/src/sql/create_sla_table.sql
new file mode 100644
index 0000000..9914a03
--- /dev/null
+++ b/src/sql/create_sla_table.sql
@@ -0,0 +1,10 @@
+DROP TABLE if exists active_sla;
+CREATE TABLE active_sla (
+ exec_id INT NOT NULL,
+ job_name VARCHAR(128) NOT NULL,
+ check_time BIGINT NOT NULL,
+ rule TINYINT NOT NULL,
+ enc_type TINYINT,
+ options LONGBLOB NOT NULL,
+ primary key(exec_id, job_name, check_time, rule)
+) ENGINE=InnoDB;
src/web/css/azkaban.css 292(+277 -15)
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 80f0f17..67d3287 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -1346,8 +1346,8 @@ tr:hover td {
bottom: 0px;
}
-#graphPanel {
- background-color: #F0F0F0;
+.radioLabel.disabled {
+ opacity: 0.3;
}
#executing-options {
@@ -1357,14 +1357,6 @@ tr:hover td {
bottom: 40px;
}
-#scheduled {
-
-}
-
-.radioLabel.disabled {
- opacity: 0.3;
-}
-
#executing-options .svgDiv {
position: absolute;
background-color: #CCC;
@@ -1447,6 +1439,189 @@ tr:hover td {
border-bottom: 1px solid #CCC;
}
+#sla-options {
+ left: 100px;
+ right: 100px;
+ top: 50px;
+ bottom: 40px;
+}
+
+#sla-options .svgDiv {
+ position: absolute;
+ background-color: #CCC;
+ padding: 1px;
+ left: 270px;
+ right: 0px;
+ top: 0px;
+ bottom: 0px;
+}
+
+#sla-options .jobList {
+ position: absolute;
+ width: 255px;
+ top: 0px;
+ bottom: 0px;
+ padding: 5px;
+ background-color: #F0F0F0;
+}
+
+#sla-options .list {
+ width: 255px;
+}
+
+#sla-options ul.optionsPicker {
+ margin-left: 30px;
+}
+
+#sla-options ul.optionsPicker li {
+ float: left;
+ font-size: 12pt;
+ font-weight: bold;
+ margin-right: 15px;
+ cursor: pointer;
+ color: #CCC;
+}
+
+#sla-options ul.optionsPicker li.selected {
+ text-decoration: underline;
+ color: #000;
+}
+
+#sla-options ul.optionsPicker li.selected:hover {
+ color: #000;
+}
+
+#sla-options ul.optionsPicker li:hover {
+ color: #888;
+}
+
+#sla-options .optionsPane {
+ position: absolute;
+ top: 85px;
+ background-color: #FFF;
+ left: 0px;
+ right: 0px;
+ bottom: 0px;
+}
+
+#sla-options .panel {
+ position: absolute;
+ width: 100%;
+ top: 0px;
+ bottom: 65px;
+}
+
+#sla-options .generalPanel.panel {
+ background-color: #F4F4F4;
+ padding-top: 15px;
+}
+
+#sla-options h3 {
+ margin-left: 20px;
+ font-size: 14pt;
+ border-bottom: 1px solid #CCC;
+}
+
+#sla-options h4 {
+ margin-left: 20px;
+ font-size: 12pt;
+ border-bottom: 1px solid #CCC;
+}
+
+
+#schedule-options {
+ left: 100px;
+ right: 100px;
+ top: 50px;
+ bottom: 40px;
+}
+
+#schedule-options .svgDiv {
+ position: absolute;
+ background-color: #CCC;
+ padding: 1px;
+ left: 270px;
+ right: 0px;
+ top: 0px;
+ bottom: 0px;
+}
+
+#schedule-options .jobList {
+ position: absolute;
+ width: 255px;
+ top: 0px;
+ bottom: 0px;
+ padding: 5px;
+ background-color: #F0F0F0;
+}
+
+#schedule-options .list {
+ width: 255px;
+}
+
+#schedule-options ul.optionsPicker {
+ margin-left: 30px;
+}
+
+#schedule-options ul.optionsPicker li {
+ float: left;
+ font-size: 12pt;
+ font-weight: bold;
+ margin-right: 15px;
+ cursor: pointer;
+ color: #CCC;
+}
+
+#schedule-options ul.optionsPicker li.selected {
+ text-decoration: underline;
+ color: #000;
+}
+
+#schedule-options ul.optionsPicker li.selected:hover {
+ color: #000;
+}
+
+#schedule-options ul.optionsPicker li:hover {
+ color: #888;
+}
+
+#schedule-options .optionsPane {
+ position: absolute;
+ top: 85px;
+ background-color: #FFF;
+ left: 0px;
+ right: 0px;
+ bottom: 0px;
+}
+
+#schedule-options .panel {
+ position: absolute;
+ width: 100%;
+ top: 0px;
+ bottom: 65px;
+}
+
+#schedule-options .generalPanel.panel {
+ background-color: #F4F4F4;
+ padding-top: 15px;
+}
+
+#schedule-options h3 {
+ margin-left: 20px;
+ font-size: 14pt;
+ border-bottom: 1px solid #CCC;
+}
+
+#schedule-options h4 {
+ margin-left: 20px;
+ font-size: 12pt;
+ border-bottom: 1px solid #CCC;
+}
+
+#graphPanel {
+ background-color: #F0F0F0;
+}
+
#generalPanel {
overflow: auto;
}
@@ -1552,6 +1727,97 @@ tr:hover td {
top: 100px;
}
+#scheduleGraphPanel {
+ background-color: #F0F0F0;
+}
+
+#scheduleGeneralPanel {
+ overflow: auto;
+}
+
+#scheduleGeneralPanel dt {
+ width: 150px;
+ font-size: 10pt;
+ font-weight: bold;
+ margin-top: 5px;
+}
+
+#scheduleGeneralPanel textarea {
+ width: 500px;
+}
+
+#scheduleGeneralPanel table #addRow {
+ cursor: pointer;
+}
+
+#scheduleGeneralPanel table tr {
+ height: 24px;
+}
+
+#scheduleGeneralPanel table .editable {
+
+}
+
+#scheduleGeneralPanel table .editable input {
+ border: 1px solid #009FC9;
+ height: 16px;
+}
+
+#scheduleGeneralPanel table .name {
+ width: 40%;
+}
+
+#scheduleGeneralPanel span.addIcon {
+ display: block;
+ width: 16px;
+ height: 16px;
+ background-image: url("./images/addIcon.png");
+}
+
+#scheduleGeneralPanel span.removeIcon {
+ display: block;
+ visibility:hidden;
+ disabled: true;
+ width: 16px;
+ height: 16px;
+ background-image: url("./images/removeIcon.png");
+ cursor: pointer;
+}
+
+#scheduleGeneralPanel .editable:hover span.removeIcon {
+ visibility:visible;
+}
+
+#scheduleGeneralPanel {
+}
+
+#scheduleGeneralPanel span {
+ float: left;
+ margin-left: 5px;
+}
+
+#scheduleGeneralPanel dd {
+ font-size: 10pt;
+}
+
+#scheduleFlowPropertyOverride {
+ clear: both;
+ padding-top: 30px;
+}
+
+#scheduleFlowPropertyOverride .tableDiv {
+ padding-right: 20px;
+ padding-left: 20px;
+}
+
+#scheduleJobList {
+ position: absolute;
+ top: 0px;
+ left: 0px;
+ height: 100%;
+ width: 250px;
+}
+
.filter {
width: 100%;
}
@@ -1581,7 +1847,7 @@ tr:hover td {
background-position: 16px 0px;
}
-.list ul li:hover{
+.list ul li:hover {
background-color: #E1E3E2;
color: #009FC9;
}
@@ -2371,10 +2637,6 @@ tr.row td.tb-name {
cursor: pointer;
}
-.job-hover-menu {
- padding-top: 1px;
-}
-
.azkaban-charts .expandable-hitarea { background-position: -32px -16px; }
.azkaban-charts .expandable-hitarea.collapse { background-position: 0 -16px; }
/* clean up */
diff --git a/src/web/js/azkaban.exflow.options.view.js b/src/web/js/azkaban.exflow.options.view.js
index 57ff096..5f70a97 100644
--- a/src/web/js/azkaban.exflow.options.view.js
+++ b/src/web/js/azkaban.exflow.options.view.js
@@ -316,7 +316,7 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
$(curTarget).append(input);
$(input).focus();
this.editingTarget = curTarget;
- }
+ }
},
handleRemoveColumn : function(evt) {
var curTarget = evt.currentTarget;
@@ -477,4 +477,4 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
cloneModel.trigger("change:disabled");
}
}
-});
\ No newline at end of file
+});
src/web/js/azkaban.flow.view.js 110(+19 -91)
diff --git a/src/web/js/azkaban.flow.view.js b/src/web/js/azkaban.flow.view.js
index 0624033..584e3a4 100644
--- a/src/web/js/azkaban.flow.view.js
+++ b/src/web/js/azkaban.flow.view.js
@@ -280,82 +280,6 @@ azkaban.GraphModel = Backbone.Model.extend({});
var executionModel;
azkaban.ExecutionModel = Backbone.Model.extend({});
-var scheduleFlowView;
-azkaban.ScheduleFlowView = Backbone.View.extend({
- events : {
- "click #schedule-btn": "handleScheduleFlow"
- },
- initialize : function(settings) {
- $( "#datepicker" ).datepicker();
- $( "#datepicker" ).datepicker('setDate', new Date());
- $("#errorMsg").hide();
- },
- handleScheduleFlow : function(evt) {
- // First make sure we can upload
-// var projectName = $('#path').val();
- var description = $('#description').val();
-
- var hourVal = $('#hour').val();
- var minutesVal = $('#minutes').val();
- var ampmVal = $('#am_pm').val();
- var timezoneVal = $('#timezone').val();
- var dateVal = $('#datepicker').val();
- var is_recurringVal = $('#is_recurring').val();
- var periodVal = $('#period').val();
- var periodUnits = $('#period_units').val();
-
- console.log("Creating schedule for "+projectName+"."+flowName);
- $.ajax({
- async: "false",
- url: "schedule",
- dataType: "json",
- type: "POST",
- data: {
- action:"scheduleFlow",
-
- projectId:projectId,
- projectName:projectName,
- flowName:flowName,
- hour:hourVal,
- minutes:minutesVal,
- am_pm:ampmVal,
- timezone:timezoneVal,
- date:dateVal,
- userExec:"dummy",
- is_recurring:is_recurringVal,
- period:periodVal,
- period_units:periodUnits
- },
- success: function(data) {
- if (data.status == "success") {
- console.log("Successfully scheduled for "+projectName+"."+flowName);
- if (data.action == "redirect") {
- window.location = contextURL + "/manager?project=" + projectName + "&flow=" + flowName ;
- }
- else{
- $("#success_message").text("Flow " + projectName + "." + flowName + " scheduled!" );
- window.location = contextURL + "/manager?project=" + projectName + "&flow=" + flowName ;
- }
- }
- else {
- if (data.action == "login") {
- window.location = "";
- }
- else {
- $("#errorMsg").text("ERROR: " + data.message);
- $("#errorMsg").slideDown("fast");
- }
- }
- }
- });
-
- },
- render: function() {
-
- }
-});
-
-
$(function() {
var selected;
// Execution model has to be created before the window switches the tabs.
@@ -367,8 +291,10 @@ $(function() {
graphModel = new azkaban.GraphModel();
svgGraphView = new azkaban.SvgGraphView({el:$('#svgDiv'), model: graphModel, rightClick: {id: 'jobMenu', callback: handleJobMenuClick}});
jobsListView = new azkaban.JobListView({el:$('#jobList'), model: graphModel, rightClick: {id: 'jobMenu', callback: handleJobMenuClick}});
- scheduleFlowView = new azkaban.ScheduleFlowView({el:$('#schedule-flow')});
+ scheduleFlowView = new azkaban.ScheduleFlowView({el:$('#schedule-flow'), model: graphModel});
+ advancedScheduleView = new azkaban.AdvancedScheduleView({el:$('#schedule-options'), model: graphModel});
executeFlowView = new azkaban.ExecuteFlowView({el:$('#executing-options'), model: graphModel});
+
var requestURL = contextURL + "/manager";
// Set up the Flow options view. Create a new one every time :p
@@ -435,20 +361,22 @@ $(function() {
"json"
);
+
$('#scheduleflowbtn').click( function() {
- console.log("schedule button clicked");
- $('#schedule-flow').modal({
- closeHTML: "<a href='#' title='Close' class='modal-close'>x</a>",
- position: ["20%",],
- containerId: 'confirm-container',
- containerCss: {
- 'height': '220px',
- 'width': '500px'
- },
- onShow: function (dialog) {
- var modal = this;
- $("#errorMsg").hide();
- }
- });
+ console.log("schedule button clicked");
+ $('#schedule-flow').modal({
+ closeHTML: "<a href='#' title='Close' class='modal-close'>x</a>",
+ position: ["20%",],
+ containerId: 'confirm-container',
+ containerCss: {
+ 'height': '220px',
+ 'width': '500px'
+ },
+ onShow: function (dialog) {
+ var modal = this;
+ $("#errorMsg").hide();
+ }
+ });
});
+
});
src/web/js/azkaban.schedule.options.view.js 586(+586 -0)
diff --git a/src/web/js/azkaban.schedule.options.view.js b/src/web/js/azkaban.schedule.options.view.js
new file mode 100644
index 0000000..5b9b760
--- /dev/null
+++ b/src/web/js/azkaban.schedule.options.view.js
@@ -0,0 +1,586 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+$.namespace('azkaban');
+
+var scheduleCustomSvgGraphView;
+var scheduleCustomJobListView;
+
+var scheduleFlowView;
+
+var scheduleFlowData;
+
+//function recurseAllAncestors(nodes, disabledMap, id, disable) {
+// var node = nodes[id];
+//
+// if (node.inNodes) {
+// for (var key in node.inNodes) {
+// disabledMap[key] = disable;
+// recurseAllAncestors(nodes, disabledMap, key, disable);
+// }
+// }
+//}
+//
+//function recurseAllDescendents(nodes, disabledMap, id, disable) {
+// var node = nodes[id];
+//
+// if (node.outNodes) {
+// for (var key in node.outNodes) {
+// disabledMap[key] = disable;
+// recurseAllDescendents(nodes, disabledMap, key, disable);
+// }
+// }
+//}
+//
+azkaban.ScheduleContextMenu = Backbone.View.extend({
+ events : {
+ "click #scheduleDisableArrow" : "handleDisabledClick",
+ "click #scheduleEnableArrow" : "handleEnabledClick"
+ },
+ initialize: function(settings) {
+ $('#scheduleDisableSub').hide();
+ $('#scheduleEnableSub').hide();
+ },
+ handleEnabledClick: function(evt) {
+ if(evt.stopPropagation) {
+ evt.stopPropagation();
+ }
+ evt.cancelBubble=true;
+
+ if (evt.currentTarget.expanded) {
+ evt.currentTarget.expanded=false;
+ $('#scheduleEnableArrow').removeClass('collapse');
+ $('#scheduleEnableSub').hide();
+ }
+ else {
+ evt.currentTarget.expanded=true;
+ $('#scheduleEnableArrow').addClass('collapse');
+ $('#scheduleEnableSub').show();
+ }
+ },
+ handleDisabledClick: function(evt) {
+ if(evt.stopPropagation) {
+ evt.stopPropagation();
+ }
+ evt.cancelBubble=true;
+
+ if (evt.currentTarget.expanded) {
+ evt.currentTarget.expanded=false;
+ $('#scheduleDisableArrow').removeClass('collapse');
+ $('#scheduleDisableSub').hide();
+ }
+ else {
+ evt.currentTarget.expanded=true;
+ $('#scheduleDisableArrow').addClass('collapse');
+ $('#scheduleDisableSub').show();
+ }
+ }
+});
+
+azkaban.ScheduleFlowView = Backbone.View.extend({
+ events : {
+ "click #schedule-btn": "handleScheduleFlow",
+ "click #adv-schedule-opt-btn": "handleAdvancedSchedule"
+ },
+ initialize : function(settings) {
+ $( "#datepicker" ).datepicker();
+ $( "#datepicker" ).datepicker('setDate', new Date());
+ $("#errorMsg").hide();
+ },
+ handleAdvancedSchedule : function(evt) {
+ console.log("Clicked advanced schedule options button");
+ //$('#confirm-container').hide();
+ $.modal.close();
+ advancedScheduleView.show();
+ },
+ handleScheduleFlow : function(evt) {
+
+ var hourVal = $('#hour').val();
+ var minutesVal = $('#minutes').val();
+ var ampmVal = $('#am_pm').val();
+ var timezoneVal = $('#timezone').val();
+ var dateVal = $('#datepicker').val();
+ var is_recurringVal = $('#is_recurring').val();
+ var periodVal = $('#period').val();
+ var periodUnits = $('#period_units').val();
+
+ console.log("Creating schedule for "+projectName+"."+flowName);
+ $.ajax({
+ async: "false",
+ url: "schedule",
+ dataType: "json",
+ type: "POST",
+ data: {
+ action:"scheduleFlow",
+ projectId:projectId,
+ projectName:projectName,
+ flowName:flowName,
+ hour:hourVal,
+ minutes:minutesVal,
+ am_pm:ampmVal,
+ timezone:timezoneVal,
+ date:dateVal,
+ userExec:"dummy",
+ is_recurring:is_recurringVal,
+ period:periodVal,
+ period_units:periodUnits
+ },
+ success: function(data) {
+ if (data.status == "success") {
+ console.log("Successfully scheduled for "+projectName+"."+flowName);
+ if (data.action == "redirect") {
+ window.location = contextURL + "/manager?project=" + projectName + "&flow=" + flowName ;
+ }
+ else{
+ $("#success_message").text("Flow " + projectName + "." + flowName + " scheduled!" );
+ window.location = contextURL + "/manager?project=" + projectName + "&flow=" + flowName ;
+ }
+ }
+ else {
+ if (data.action == "login") {
+ window.location = "";
+ }
+ else {
+ $("#errorMsg").text("ERROR: " + data.message);
+ $("#errorMsg").slideDown("fast");
+ }
+ }
+ }
+ });
+
+ },
+ render: function() {
+ }
+});
+
+azkaban.AdvancedScheduleView = Backbone.View.extend({
+ events : {
+ "click" : "closeEditingTarget",
+ "click #adv-schedule-btn": "handleAdvSchedule",
+ "click #schedule-cancel-btn": "handleCancel",
+ "click .modal-close": "handleCancel",
+ "click #scheduleGeneralOptions": "handleGeneralOptionsSelect",
+ "click #scheduleFlowOptions": "handleFlowOptionsSelect",
+ "click #scheduleAddRow": "handleAddRow",
+ "click table .editable": "handleEditColumn",
+ "click table .removeIcon": "handleRemoveColumn"
+ },
+ initialize: function(setting) {
+ this.contextMenu = new azkaban.ScheduleContextMenu({el:$('#scheduleDisableJobMenu')});
+ this.handleGeneralOptionsSelect();
+ $( "#advdatepicker" ).datepicker();
+ $( "#advdatepicker" ).datepicker('setDate', new Date());
+ },
+ show: function() {
+ $('#scheduleModalBackground').show();
+ $('#schedule-options').show();
+ this.handleGeneralOptionsSelect();
+
+ scheduleFlowData = this.model.clone();
+ this.flowData = scheduleFlowData;
+ var flowData = scheduleFlowData;
+
+ var fetchData = {"project": projectName, "ajax":"flowInfo", "flow":flowName};
+
+ var executeURL = contextURL + "/executor";
+ this.executeURL = executeURL;
+ var scheduleURL = contextURL + "/schedule";
+ this.scheduleURL = scheduleURL;
+ var handleAddRow = this.handleAddRow;
+
+ var data = flowData.get("data");
+ var nodes = {};
+ for (var i=0; i < data.nodes.length; ++i) {
+ var node = data.nodes[i];
+ nodes[node.id] = node;
+ }
+
+ for (var i=0; i < data.edges.length; ++i) {
+ var edge = data.edges[i];
+ var fromNode = nodes[edge.from];
+ var toNode = nodes[edge.target];
+
+ if (!fromNode.outNodes) {
+ fromNode.outNodes = {};
+ }
+ fromNode.outNodes[toNode.id] = toNode;
+
+ if (!toNode.inNodes) {
+ toNode.inNodes = {};
+ }
+ toNode.inNodes[fromNode.id] = fromNode;
+ }
+ flowData.set({nodes: nodes});
+
+ var disabled = {};
+ for (var i = 0; i < data.nodes.length; ++i) {
+ var updateNode = data.nodes[i];
+ if (updateNode.status == "DISABLED" || updateNode.status == "SKIPPED") {
+ updateNode.status = "READY";
+ disabled[updateNode.id] = true;
+ }
+ if (updateNode.status == "SUCCEEDED" || updateNode.status=="RUNNING") {
+ disabled[updateNode.id] = true;
+ }
+ }
+ flowData.set({disabled: disabled});
+
+ $.get(
+ executeURL,
+ fetchData,
+ function(data) {
+ if (data.error) {
+ alert(data.error);
+ }
+ else {
+ if (data.successEmails) {
+ $('#scheduleSuccessEmails').val(data.successEmails.join());
+ }
+ if (data.failureEmails) {
+ $('#scheduleFailureEmails').val(data.failureEmails.join());
+ }
+
+ if (data.failureAction) {
+ $('#scheduleFailureAction').val(data.failureAction);
+ }
+ if (data.notifyFailureFirst) {
+ $('#scheduleNotifyFailureFirst').attr('checked', true);
+ }
+ if (data.notifyFailureLast) {
+ $('#scheduleNotifyFailureLast').attr('checked', true);
+ }
+ if (data.flowParam) {
+ var flowParam = data.flowParam;
+ for (var key in flowParam) {
+ var row = handleAddRow();
+ var td = $(row).find('td');
+ $(td[0]).text(key);
+ $(td[1]).text(flowParam[key]);
+ }
+ }
+
+ if (!data.running || data.running.length == 0) {
+ $(".radio").attr("disabled", "disabled");
+ $(".radioLabel").addClass("disabled", "disabled");
+ }
+ }
+ },
+ "json"
+ );
+ },
+ handleCancel: function(evt) {
+ $('#scheduleModalBackground').hide();
+ $('#schedule-options').hide();
+ },
+ handleGeneralOptionsSelect: function(evt) {
+ $('#scheduleFlowOptions').removeClass('selected');
+ $('#scheduleGeneralOptions').addClass('selected');
+
+ $('#scheduleGeneralPanel').show();
+ $('#scheduleGraphPanel').hide();
+ },
+ handleFlowOptionsSelect: function(evt) {
+ $('#scheduleGeneralOptions').removeClass('selected');
+ $('#scheduleFlowOptions').addClass('selected');
+
+ $('#scheduleGraphPanel').show();
+ $('#scheduleGeneralPanel').hide();
+
+ if (this.flowSetup) {
+ return;
+ }
+
+ scheduleCustomSvgGraphView = new azkaban.SvgGraphView({el:$('#scheduleSvgDivCustom'), model: scheduleFlowData, rightClick: {id: 'scheduleDisableJobMenu', callback: this.handleDisableMenuClick}});
+ scheduleCustomJobsListView = new azkaban.JobListView({el:$('#scheduleJobListCustom'), model: scheduleFlowData, rightClick: {id: 'scheduleDisableJobMenu', callback: this.handleDisableMenuClick}});
+ scheduleFlowData.trigger("change:graph");
+
+ this.flowSetup = true;
+ },
+ handleAdvSchedule: function(evt) {
+ var scheduleURL = this.scheduleURL;
+ var disabled = this.flowData.get("disabled");
+ var disabledJobs = "";
+ for(var job in disabled) {
+ if(disabled[job] == true) {
+ disabledJobs += "," + job;
+ }
+ }
+ var failureAction = $('#scheduleFailureAction').val();
+ var failureEmails = $('#scheduleFailureEmails').val();
+ var successEmails = $('#scheduleSuccessEmails').val();
+ var notifyFailureFirst = $('#scheduleNotifyFailureFirst').is(':checked');
+ var notifyFailureLast = $('#scheduleNotifyFailureLast').is(':checked');
+ var executingJobOption = $('input:radio[name=gender]:checked').val();
+
+
+ var scheduleTime = $('#advhour').val() + "," + $('#advminutes').val() + "," + $('#advam_pm').val() + "," + $('#advtimezone').val();
+ var scheduleDate = $('#advdatepicker').val();
+ var is_recurring = $('#advis_recurring').val();
+ var period = $('#advperiod').val() + $('#advperiod_units').val();
+
+ var flowOverride = {};
+ var editRows = $(".editRow");
+ for (var i = 0; i < editRows.length; ++i) {
+ var row = editRows[i];
+ var td = $(row).find('td');
+ var key = $(td[0]).text();
+ var val = $(td[1]).text();
+
+ if (key && key.length > 0) {
+ flowOverride[key] = val;
+ }
+ }
+
+ var scheduleData = {
+ projectId:projectId,
+ projectName: projectName,
+ ajax: "advSchedule",
+ flowName: flowName,
+ scheduleTime: scheduleTime,
+ scheduleDate: scheduleDate,
+ is_recurring: is_recurring,
+ period: period,
+ disabledJobs: disabledJobs,
+ failureAction: failureAction,
+ failureEmails: failureEmails,
+ successEmails: successEmails,
+ notifyFailureFirst: notifyFailureFirst,
+ notifyFailureLast: notifyFailureLast,
+ executingJobOption: executingJobOption,
+ flowOverride: flowOverride
+ };
+
+ $.post(
+ scheduleURL,
+ scheduleData,
+ function(data) {
+ if (data.error) {
+ alert(data.error);
+ }
+ else {
+ window.location = scheduleURL;
+ }
+ },
+ "json"
+ )
+ },
+ handleAddRow: function(evt) {
+ var tr = document.createElement("tr");
+ var tdName = document.createElement("td");
+ var tdValue = document.createElement("td");
+
+ var icon = document.createElement("span");
+ $(icon).addClass("removeIcon");
+ var nameData = document.createElement("span");
+ $(nameData).addClass("spanValue");
+ var valueData = document.createElement("span");
+ $(valueData).addClass("spanValue");
+
+ $(tdName).append(icon);
+ $(tdName).append(nameData);
+ $(tdName).addClass("name");
+ $(tdName).addClass("editable");
+
+ $(tdValue).append(valueData);
+ $(tdValue).addClass("editable");
+
+ $(tr).addClass("editRow");
+ $(tr).append(tdName);
+ $(tr).append(tdValue);
+
+ $(tr).insertBefore("#scheduleAddRow");
+ return tr;
+ },
+ handleEditColumn : function(evt) {
+ var curTarget = evt.currentTarget;
+
+ if (this.editingTarget != curTarget) {
+ this.closeEditingTarget();
+
+ var text = $(curTarget).children(".spanValue").text();
+ $(curTarget).empty();
+
+ var input = document.createElement("input");
+ $(input).attr("type", "text");
+ $(input).css("width", "100%");
+ $(input).val(text);
+ $(curTarget).addClass("editing");
+ $(curTarget).append(input);
+ $(input).focus();
+ this.editingTarget = curTarget;
+ }
+ },
+ handleRemoveColumn : function(evt) {
+ var curTarget = evt.currentTarget;
+ // Should be the table
+ var row = curTarget.parentElement.parentElement;
+ $(row).remove();
+ },
+ closeEditingTarget: function(evt) {
+ if (this.editingTarget != null && this.editingTarget != evt.target && this.editingTarget != evt.target.parentElement ) {
+ var input = $(this.editingTarget).children("input")[0];
+ var text = $(input).val();
+ $(input).remove();
+
+ var valueData = document.createElement("span");
+ $(valueData).addClass("spanValue");
+ $(valueData).text(text);
+
+ if ($(this.editingTarget).hasClass("name")) {
+ var icon = document.createElement("span");
+ $(icon).addClass("removeIcon");
+ $(this.editingTarget).append(icon);
+ }
+
+ $(this.editingTarget).removeClass("editing");
+ $(this.editingTarget).append(valueData);
+ this.editingTarget = null;
+ }
+ },
+ handleDisableMenuClick : function(action, el, pos) {
+ var flowData = scheduleFlowData;
+ var jobid = el[0].jobid;
+ var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowName + "&job=" + jobid;
+ if (action == "open") {
+ window.location.href = requestURL;
+ }
+ else if(action == "openwindow") {
+ window.open(requestURL);
+ }
+ else if(action == "disable") {
+ var disabled = flowData.get("disabled");
+
+ disabled[jobid] = true;
+ flowData.set({disabled: disabled});
+ flowData.trigger("change:disabled");
+ }
+ else if(action == "disableAll") {
+ var disabled = flowData.get("disabled");
+
+ var nodes = flowData.get("nodes");
+ for (var key in nodes) {
+ disabled[key] = true;
+ }
+
+ flowData.set({disabled: disabled});
+ flowData.trigger("change:disabled");
+ }
+ else if (action == "disableParents") {
+ var disabled = flowData.get("disabled");
+ var nodes = flowData.get("nodes");
+ var inNodes = nodes[jobid].inNodes;
+
+ if (inNodes) {
+ for (var key in inNodes) {
+ disabled[key] = true;
+ }
+ }
+
+ flowData.set({disabled: disabled});
+ flowData.trigger("change:disabled");
+ }
+ else if (action == "disableChildren") {
+ var disabledMap = flowData.get("disabled");
+ var nodes = flowData.get("nodes");
+ var outNodes = nodes[jobid].outNodes;
+
+ if (outNodes) {
+ for (var key in outNodes) {
+ disabledMap[key] = true;
+ }
+ }
+
+ flowData.set({disabled: disabledMap});
+ flowData.trigger("change:disabled");
+ }
+ else if (action == "disableAncestors") {
+ var disabled = flowData.get("disabled");
+ var nodes = flowData.get("nodes");
+
+ recurseAllAncestors(nodes, disabled, jobid, true);
+
+ flowData.set({disabled: disabled});
+ flowData.trigger("change:disabled");
+ }
+ else if (action == "disableDescendents") {
+ var disabled = flowData.get("disabled");
+ var nodes = flowData.get("nodes");
+
+ recurseAllDescendents(nodes, disabled, jobid, true);
+
+ flowData.set({disabled: disabled});
+ flowData.trigger("change:disabled");
+ }
+ else if(action == "enable") {
+ var disabled = flowData.get("disabled");
+
+ disabled[jobid] = false;
+ flowData.set({disabled: disabled});
+ flowData.trigger("change:disabled");
+ }
+ else if(action == "enableAll") {
+ disabled = {};
+ flowData.set({disabled: disabled});
+ flowData.trigger("change:disabled");
+ }
+ else if (action == "enableParents") {
+ var disabled = flowData.get("disabled");
+ var nodes = flowData.get("nodes");
+ var inNodes = nodes[jobid].inNodes;
+
+ if (inNodes) {
+ for (var key in inNodes) {
+ disabled[key] = false;
+ }
+ }
+
+ flowData.set({disabled: disabled});
+ flowData.trigger("change:disabled");
+ }
+ else if (action == "enableChildren") {
+ var disabled = flowData.get("disabled");
+ var nodes = flowData.get("nodes");
+ var outNodes = nodes[jobid].outNodes;
+
+ if (outNodes) {
+ for (var key in outNodes) {
+ disabled[key] = false;
+ }
+ }
+
+ flowData.set({disabled: disabled});
+ flowData.trigger("change:disabled");
+ }
+ else if (action == "enableAncestors") {
+ var disabled = flowData.get("disabled");
+ var nodes = flowData.get("nodes");
+
+ recurseAllAncestors(nodes, disabled, jobid, false);
+
+ flowData.set({disabled: disabled});
+ flowData.trigger("change:disabled");
+ }
+ else if (action == "enableDescendents") {
+ var disabled = flowData.get("disabled");
+ var nodes = flowData.get("nodes");
+
+ recurseAllDescendents(nodes, disabled, jobid, false);
+
+ flowData.set({disabled: disabled});
+ flowData.trigger("change:disabled");
+ }
+ }
+});
\ No newline at end of file
src/web/js/azkaban.scheduled.view.js 353(+338 -15)
diff --git a/src/web/js/azkaban.scheduled.view.js b/src/web/js/azkaban.scheduled.view.js
index 5cea0ac..b66374b 100644
--- a/src/web/js/azkaban.scheduled.view.js
+++ b/src/web/js/azkaban.scheduled.view.js
@@ -1,21 +1,344 @@
$.namespace('azkaban');
+
function removeSched(projectId, flowName) {
- var scheduleURL = contextURL + "/schedule"
- var redirectURL = contextURL + "/schedule"
- $.post(
- scheduleURL,
- {"action":"removeSched", "projectId":projectId, "flowName":flowName},
- function(data) {
- if (data.error) {
+ var scheduleURL = contextURL + "/schedule"
+ var redirectURL = contextURL + "/schedule"
+ $.post(
+ scheduleURL,
+ {"action":"removeSched", "projectId":projectId, "flowName":flowName},
+ function(data) {
+ if (data.error) {
+// alert(data.error)
+ $('#errorMsg').text(data.error);
+ }
+ else {
+// alert("Schedule "+schedId+" removed!")
+ window.location = redirectURL;
+ }
+ },
+ "json"
+ )
+}
+
+function removeSla(projectId, flowName) {
+ var scheduleURL = contextURL + "/schedule"
+ var redirectURL = contextURL + "/schedule"
+ $.post(
+ scheduleURL,
+ {"action":"removeSla", "projectId":projectId, "flowName":flowName},
+ function(data) {
+ if (data.error) {
// alert(data.error)
- $('#errorMsg').text(data.error)
- }
- else {
+ $('#errorMsg').text(data.error)
+ }
+ else {
// alert("Schedule "+schedId+" removed!")
- window.location = redirectURL
- }
- },
- "json"
- )
+ window.location = redirectURL
+ }
+ },
+ "json"
+ )
}
+
+azkaban.ChangeSlaView = Backbone.View.extend({
+ events : {
+ "click" : "closeEditingTarget",
+ "click #set-sla-btn": "handleSetSla",
+ "click #remove-sla-btn": "handleRemoveSla",
+ "click #sla-cancel-btn": "handleSlaCancel",
+ "click .modal-close": "handleSlaCancel",
+ "click #addRow": "handleAddRow"
+ },
+ initialize: function(setting) {
+
+ },
+ handleSlaCancel: function(evt) {
+ console.log("Clicked cancel button");
+ var scheduleURL = contextURL + "/schedule";
+
+ $('#slaModalBackground').hide();
+ $('#sla-options').hide();
+
+ var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
+ var rows = tFlowRules.rows;
+ var rowLength = rows.length
+ for(var i = 0; i < rowLength-1; i++) {
+ tFlowRules.deleteRow(0);
+ }
+
+ },
+ initFromSched: function(projId, flowName) {
+ this.projectId = projId;
+ this.flowName = flowName;
+
+ var scheduleURL = contextURL + "/schedule"
+ this.scheduleURL = scheduleURL;
+ var indexToName = {};
+ var nameToIndex = {};
+ var indexToText = {};
+ this.indexToName = indexToName;
+ this.nameToIndex = nameToIndex;
+ this.indexToText = indexToText;
+ var ruleBoxOptions = ["SUCCESS", "FINISH"];
+ this.ruleBoxOptions = ruleBoxOptions;
+
+ var fetchScheduleData = {"projId": this.projectId, "ajax":"slaInfo", "flowName":this.flowName};
+
+ $.get(
+ this.scheduleURL,
+ fetchScheduleData,
+ function(data) {
+ if (data.error) {
+ alert(data.error);
+ }
+ else {
+ if (data.slaEmails) {
+ $('#slaEmails').val(data.slaEmails.join());
+ }
+
+ var allJobNames = data.allJobNames;
+
+ indexToName[0] = "";
+ nameToIndex[flowName] = 0;
+ indexToText[0] = "flow " + flowName;
+ for(var i = 1; i <= allJobNames.length; i++) {
+ indexToName[i] = allJobNames[i-1];
+ nameToIndex[allJobNames[i-1]] = i;
+ indexToText[i] = "job " + allJobNames[i-1];
+ }
+
+
+
+
+
+ // populate with existing settings
+ if(data.settings) {
+
+ var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
+
+ for(var setting in data.settings) {
+ var rFlowRule = tFlowRules.insertRow(0);
+
+ var cId = rFlowRule.insertCell(-1);
+ var idSelect = document.createElement("select");
+ for(var i in indexToName) {
+ idSelect.options[i] = new Option(indexToText[i], indexToName[i]);
+ if(data.settings[setting].id == indexToName[i]) {
+ idSelect.options[i].selected = true;
+ }
+ }
+ cId.appendChild(idSelect);
+
+ var cRule = rFlowRule.insertCell(-1);
+ var ruleSelect = document.createElement("select");
+ for(var i in ruleBoxOptions) {
+ ruleSelect.options[i] = new Option(ruleBoxOptions[i], ruleBoxOptions[i]);
+ if(data.settings[setting].rule == ruleBoxOptions[i]) {
+ ruleSelect.options[i].selected = true;
+ }
+ }
+ cRule.appendChild(ruleSelect);
+
+ var cDuration = rFlowRule.insertCell(-1);
+ var duration = document.createElement("input");
+ duration.type = "text";
+ duration.setAttribute("class", "durationpick");
+ var rawMinutes = data.settings[setting].duration;
+ var intMinutes = rawMinutes.substring(0, rawMinutes.length-1);
+ var minutes = parseInt(intMinutes);
+ var hours = Math.floor(minutes / 60);
+ minutes = minutes % 60;
+ duration.value = hours + ":" + minutes;
+ cDuration.appendChild(duration);
+
+ var cEmail = rFlowRule.insertCell(-1);
+ var emailCheck = document.createElement("input");
+ emailCheck.type = "checkbox";
+ for(var act in data.settings[setting].actions) {
+ if(data.settings[setting].actions[act] == "EMAIL") {
+ emailCheck.checked = true;
+ }
+ }
+ cEmail.appendChild(emailCheck);
+
+ var cKill = rFlowRule.insertCell(-1);
+ var killCheck = document.createElement("input");
+ killCheck.type = "checkbox";
+ for(var act in data.settings[setting].actions) {
+ if(data.settings[setting].actions[act] == "KILL") {
+ killCheck.checked = true;
+ }
+ }
+ cKill.appendChild(killCheck);
+
+ $('.durationpick').timepicker({hourMax: 99});
+ }
+ }
+ $('.durationpick').timepicker({hourMax: 99});
+ }
+ },
+ "json"
+ );
+
+ $('#slaModalBackground').show();
+ $('#sla-options').show();
+
+// this.schedFlowOptions = sched.flowOptions
+ console.log("Loaded schedule info. Ready to set SLA.");
+
+ },
+ handleRemoveSla: function(evt) {
+ console.log("Clicked remove sla button");
+ var scheduleURL = this.scheduleURL;
+ var redirectURL = this.scheduleURL;
+ $.post(
+ scheduleURL,
+ {"action":"removeSla", "projectId":this.projectId, "flowName":this.flowName},
+ function(data) {
+ if (data.error) {
+ $('#errorMsg').text(data.error)
+ }
+ else {
+ window.location = redirectURL
+ }
+ "json"
+ }
+ );
+
+ },
+ handleSetSla: function(evt) {
+
+ var slaEmails = $('#slaEmails').val();
+ var settings = {};
+
+
+ var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
+ for(var row = 0; row < tFlowRules.rows.length-1; row++) {
+ var rFlowRule = tFlowRules.rows[row];
+ var id = rFlowRule.cells[0].firstChild.value;
+ var rule = rFlowRule.cells[1].firstChild.value;
+ var duration = rFlowRule.cells[2].firstChild.value;
+ var email = rFlowRule.cells[3].firstChild.value;
+ var kill = rFlowRule.cells[4].firstChild.value;
+ settings[row] = id + "," + rule + "," + duration + "," + email + "," + kill;
+ }
+
+ var slaData = {
+ projectId: this.projectId,
+ flowName: this.flowName,
+ ajax: "setSla",
+ slaEmails: slaEmails,
+ settings: settings
+ };
+
+ var scheduleURL = this.scheduleURL;
+
+ $.post(
+ scheduleURL,
+ slaData,
+ function(data) {
+ if (data.error) {
+ alert(data.error);
+ }
+ else {
+ tFlowRules.length = 0;
+ window.location = scheduleURL;
+ }
+ },
+ "json"
+ );
+ },
+ handleAddRow: function(evt) {
+
+ var indexToName = this.indexToName;
+ var nameToIndex = this.nameToIndex;
+ var indexToText = this.indexToText;
+ var ruleBoxOptions = this.ruleBoxOptions;
+
+ var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
+ var rFlowRule = tFlowRules.insertRow(0);
+
+ var cId = rFlowRule.insertCell(-1);
+ var idSelect = document.createElement("select");
+ for(var i in indexToName) {
+ idSelect.options[i] = new Option(indexToText[i], indexToName[i]);
+ }
+
+ cId.appendChild(idSelect);
+
+ var cRule = rFlowRule.insertCell(-1);
+ var ruleSelect = document.createElement("select");
+ for(var i in ruleBoxOptions) {
+ ruleSelect.options[i] = new Option(ruleBoxOptions[i], ruleBoxOptions[i]);
+ }
+ cRule.appendChild(ruleSelect);
+
+ var cDuration = rFlowRule.insertCell(-1);
+ var duration = document.createElement("input");
+ duration.type = "text";
+ duration.setAttribute("class", "durationpick");
+ cDuration.appendChild(duration);
+
+ var cEmail = rFlowRule.insertCell(-1);
+ var emailCheck = document.createElement("input");
+ emailCheck.type = "checkbox";
+ cEmail.appendChild(emailCheck);
+
+ var cKill = rFlowRule.insertCell(-1);
+ var killCheck = document.createElement("input");
+ killCheck.type = "checkbox";
+ cKill.appendChild(killCheck);
+
+ $('.durationpick').timepicker({hourMax: 99});
+
+ return rFlowRule;
+ },
+ handleEditColumn : function(evt) {
+ var curTarget = evt.currentTarget;
+
+ if (this.editingTarget != curTarget) {
+ this.closeEditingTarget();
+
+ var text = $(curTarget).children(".spanValue").text();
+ $(curTarget).empty();
+
+ var input = document.createElement("input");
+ $(input).attr("type", "text");
+ $(input).css("width", "100%");
+ $(input).val(text);
+ $(curTarget).addClass("editing");
+ $(curTarget).append(input);
+ $(input).focus();
+ this.editingTarget = curTarget;
+ }
+ },
+ handleRemoveColumn : function(evt) {
+ var curTarget = evt.currentTarget;
+ // Should be the table
+ var row = curTarget.parentElement.parentElement;
+ $(row).remove();
+ },
+ closeEditingTarget: function(evt) {
+
+ }
+});
+
+var slaView;
+
+$(function() {
+ var selected;
+
+
+ slaView = new azkaban.ChangeSlaView({el:$('#sla-options')});
+
+// var requestURL = contextURL + "/manager";
+
+ // Set up the Flow options view. Create a new one every time :p
+// $('#addSlaBtn').click( function() {
+// slaView.show();
+// });
+
+
+
+});
\ No newline at end of file
unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java 181(+136 -45)
diff --git a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
index f888f45..4a9491c 100644
--- a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
+++ b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
@@ -4,7 +4,9 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import javax.sql.DataSource;
@@ -13,11 +15,24 @@ import junit.framework.Assert;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
+import org.joda.time.DateTimeZone;
+import org.joda.time.DurationFieldType;
+import org.joda.time.Hours;
+import org.joda.time.MutablePeriod;
+import org.joda.time.Period;
+import org.joda.time.PeriodType;
+import org.joda.time.ReadablePeriod;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.scheduler.Schedule.FlowOptions;
+import azkaban.scheduler.Schedule.SlaOptions;
+import azkaban.sla.SLA.SlaAction;
+import azkaban.sla.SLA.SlaRule;
+import azkaban.sla.SLA.SlaSetting;
import azkaban.utils.DataSourceUtils;
import azkaban.utils.Props;
@@ -25,7 +40,7 @@ public class JdbcScheduleLoaderTest {
private static boolean testDBExists;
private static final String host = "localhost";
private static final int port = 3306;
- private static final String database = "azkaban";
+ private static final String database = "azkaban2";
private static final String user = "azkaban";
private static final String password = "azkaban";
private static final int numConnections = 10;
@@ -92,39 +107,6 @@ public class JdbcScheduleLoaderTest {
}
}
-// @Test
-// public void testLoadSchedule() {
-// if (!testDBExists) {
-// return;
-// }
-//
-// DataSource dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
-// Connection connection = null;
-// try {
-// connection = dataSource.getConnection();
-// } catch (SQLException e) {
-// e.printStackTrace();
-// testDBExists = false;
-// DbUtils.closeQuietly(connection);
-// return;
-// }
-//
-//// CountHandler countHandler = new CountHandler();
-// QueryRunner runner = new QueryRunner();
-// try {
-// int count = runner.update(connection, "DELETE FROM schedules");
-//
-// } catch (SQLException e) {
-// e.printStackTrace();
-// testDBExists = false;
-// DbUtils.closeQuietly(connection);
-// return;
-// }
-// finally {
-// DbUtils.closeQuietly(connection);
-// }
-// }
-
@Test
public void testInsertAndLoadSchedule() throws ScheduleManagerException {
if (!isTestSetup()) {
@@ -134,12 +116,34 @@ public class JdbcScheduleLoaderTest {
JdbcScheduleLoader loader = createLoader();
- Schedule s1 = new Schedule(1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu");
- Schedule s2 = new Schedule(1, "proj1", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "ccc");
- Schedule s3 = new Schedule(2, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu");
- Schedule s4 = new Schedule(3, "proj2", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu");
- Schedule s5 = new Schedule(3, "proj2", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu");
- Schedule s6 = new Schedule(3, "proj2", "flow3", "error", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu");
+ List<String> emails = new ArrayList<String>();
+ emails.add("email1");
+ emails.add("email2");
+ List<String> disabledJobs = new ArrayList<String>();
+ disabledJobs.add("job1");
+ disabledJobs.add("job2");
+ List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
+ SlaSetting set1 = new SlaSetting();
+ List<SlaAction> actions = new ArrayList<SlaAction>();
+ actions.add(SlaAction.EMAIL);
+ set1.setActions(actions);
+ set1.setId("");
+ set1.setDuration(Schedule.parsePeriodString("1h"));
+ set1.setRule(SlaRule.FINISH);
+ slaSets.add(set1);
+ FlowOptions flowOptions = new FlowOptions();
+ flowOptions.setFailureEmails(emails);
+ flowOptions.setDisabledJobs(disabledJobs);
+ SlaOptions slaOptions = new SlaOptions();
+ slaOptions.setSlaEmails(emails);
+ slaOptions.setSettings(slaSets);
+
+ Schedule s1 = new Schedule(1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+ Schedule s2 = new Schedule(1, "proj1", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "ccc", flowOptions, slaOptions);
+ Schedule s3 = new Schedule(2, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+ Schedule s4 = new Schedule(3, "proj2", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+ Schedule s5 = new Schedule(3, "proj2", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+ Schedule s6 = new Schedule(3, "proj2", "flow3", "error", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
loader.insertSchedule(s1);
loader.insertSchedule(s2);
@@ -149,12 +153,77 @@ public class JdbcScheduleLoaderTest {
loader.insertSchedule(s6);
List<Schedule> schedules = loader.loadSchedules();
+ Schedule sched = schedules.get(0);
Assert.assertEquals(6, schedules.size());
- Assert.assertEquals("America/Los_Angeles", schedules.get(0).getTimezone().getID());
- Assert.assertEquals(44444, schedules.get(0).getSubmitTime());
- Assert.assertEquals("1d", Schedule.createPeriodString(schedules.get(0).getPeriod()));
+ Assert.assertEquals("America/Los_Angeles", sched.getTimezone().getID());
+ Assert.assertEquals(44444, sched.getSubmitTime());
+ Assert.assertEquals("1d", Schedule.createPeriodString(sched.getPeriod()));
+ FlowOptions fOpt = sched.getFlowOptions();
+ SlaOptions sOpt = sched.getSlaOptions();
+ Assert.assertEquals(SlaAction.EMAIL, sOpt.getSettings().get(0).getActions().get(0));
+ Assert.assertEquals("", sOpt.getSettings().get(0).getId());
+ Assert.assertEquals(Schedule.parsePeriodString("1h"), sOpt.getSettings().get(0).getDuration());
+ Assert.assertEquals(SlaRule.FINISH, sOpt.getSettings().get(0).getRule());
+ Assert.assertEquals(2, fOpt.getFailureEmails().size());
+ Assert.assertEquals(null, fOpt.getSuccessEmails());
+ Assert.assertEquals(2, fOpt.getDisabledJobs().size());
+ Assert.assertEquals(FailureAction.FINISH_CURRENTLY_RUNNING, fOpt.getFailureAction());
+ Assert.assertEquals(null, fOpt.getFlowOverride());
+ }
+
+ @Test
+ public void testInsertAndUpdateSchedule() throws ScheduleManagerException {
+ if (!isTestSetup()) {
+ return;
+ }
+ clearDB();
+
+ JdbcScheduleLoader loader = createLoader();
+ List<String> emails = new ArrayList<String>();
+ emails.add("email1");
+ emails.add("email2");
+ List<String> disabledJobs = new ArrayList<String>();
+ disabledJobs.add("job1");
+ disabledJobs.add("job2");
+ List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
+ SlaSetting set1 = new SlaSetting();
+ List<SlaAction> actions = new ArrayList<SlaAction>();
+ actions.add(SlaAction.EMAIL);
+ set1.setActions(actions);
+ set1.setId("");
+ set1.setDuration(Schedule.parsePeriodString("1h"));
+ set1.setRule(SlaRule.FINISH);
+ slaSets.add(set1);
+ FlowOptions flowOptions = new FlowOptions();
+ flowOptions.setFailureEmails(emails);
+ flowOptions.setDisabledJobs(disabledJobs);
+ SlaOptions slaOptions = new SlaOptions();
+ slaOptions.setSlaEmails(emails);
+ slaOptions.setSettings(slaSets);
+
+ System.out.println("the flow options are " + flowOptions);
+ System.out.println("the sla options are " + slaOptions);
+ Schedule s1 = new Schedule(1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+
+ loader.insertSchedule(s1);
+
+ emails.add("email3");
+ slaOptions.setSlaEmails(emails);
+
+ Schedule s2 = new Schedule(1, "proj1", "flow1", "ready", 11112, "America/Los_Angeles", "2M", 22223, 33334, 44445, "cyu", flowOptions, slaOptions);
+
+ loader.updateSchedule(s2);
+
+ List<Schedule> schedules = loader.loadSchedules();
+
+ Assert.assertEquals(1, schedules.size());
+ Assert.assertEquals("America/Los_Angeles", schedules.get(0).getTimezone().getID());
+ Assert.assertEquals(44445, schedules.get(0).getSubmitTime());
+ Assert.assertEquals("2M", Schedule.createPeriodString(schedules.get(0).getPeriod()));
+// System.out.println("the options are " + schedules.get(0).getSchedOptions());
+ Assert.assertEquals(3, schedules.get(0).getSlaOptions().getSlaEmails().size());
}
@Test
@@ -169,11 +238,33 @@ public class JdbcScheduleLoaderTest {
List<Schedule> schedules = new ArrayList<Schedule>();
- int stress = 100;
+ int stress = 10;
for(int i=0; i<stress; i++)
{
- Schedule s = new Schedule(i+1, "proj"+(i+1), "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu");
+ List<String> emails = new ArrayList<String>();
+ emails.add("email1");
+ emails.add("email2");
+ List<String> disabledJobs = new ArrayList<String>();
+ disabledJobs.add("job1");
+ disabledJobs.add("job2");
+ List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
+ SlaSetting set1 = new SlaSetting();
+ List<SlaAction> actions = new ArrayList<SlaAction>();
+ actions.add(SlaAction.EMAIL);
+ set1.setActions(actions);
+ set1.setId("");
+ set1.setDuration(Schedule.parsePeriodString("1h"));
+ set1.setRule(SlaRule.FINISH);
+ slaSets.add(set1);
+ FlowOptions flowOptions = new FlowOptions();
+ flowOptions.setFailureEmails(emails);
+ flowOptions.setDisabledJobs(disabledJobs);
+ SlaOptions slaOptions = new SlaOptions();
+ slaOptions.setSlaEmails(emails);
+ slaOptions.setSettings(slaSets);
+
+ Schedule s = new Schedule(i+1, "proj"+(i+1), "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
schedules.add(s);
try {
loader.insertSchedule(s);
unit/java/azkaban/test/sla/JdbcSLALoaderTest.java 182(+182 -0)
diff --git a/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java b/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java
new file mode 100644
index 0000000..02b7855
--- /dev/null
+++ b/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java
@@ -0,0 +1,182 @@
+package azkaban.test.sla;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.joda.time.DateTime;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import azkaban.sla.JdbcSLALoader;
+import azkaban.sla.SLA;
+import azkaban.sla.SLA.SlaAction;
+import azkaban.sla.SLA.SlaRule;
+import azkaban.sla.SLALoader;
+import azkaban.utils.DataSourceUtils;
+import azkaban.utils.Props;
+
+
+
+public class JdbcSLALoaderTest {
+ private static boolean testDBExists;
+ //@TODO remove this and turn into local host.
+ private static final String host = "localhost";
+ private static final int port = 3306;
+ private static final String database = "azkaban2";
+ private static final String user = "azkaban";
+ private static final String password = "azkaban";
+ private static final int numConnections = 10;
+
+
+ @BeforeClass
+ public static void setupDB() {
+ DataSource dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
+ testDBExists = true;
+
+ Connection connection = null;
+ try {
+ connection = dataSource.getConnection();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ testDBExists = false;
+ DbUtils.closeQuietly(connection);
+ return;
+ }
+
+ CountHandler countHandler = new CountHandler();
+ QueryRunner runner = new QueryRunner();
+ try {
+ runner.query(connection, "SELECT COUNT(1) FROM active_sla", countHandler);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ testDBExists = false;
+ DbUtils.closeQuietly(connection);
+ return;
+ }
+
+ DbUtils.closeQuietly(connection);
+
+ clearDB();
+ }
+
+ @AfterClass
+ public static void clearDB() {
+ if (!testDBExists) {
+ return;
+ }
+
+ DataSource dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
+ Connection connection = null;
+ try {
+ connection = dataSource.getConnection();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ testDBExists = false;
+ DbUtils.closeQuietly(connection);
+ return;
+ }
+
+ QueryRunner runner = new QueryRunner();
+ try {
+ runner.update(connection, "DELETE FROM active_sla");
+
+ } catch (SQLException e) {
+ e.printStackTrace();
+ testDBExists = false;
+ DbUtils.closeQuietly(connection);
+ return;
+ }
+
+ DbUtils.closeQuietly(connection);
+ }
+
+ @Test
+ public void testInsertSLA() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+
+ SLALoader loader = createLoader();
+
+ int execId = 1;
+ String jobName = "";
+ DateTime checkTime = new DateTime(11111);
+ List<String> emails = new ArrayList<String>();
+ emails.add("email1");
+ emails.add("email2");
+// List<SlaRule> rules = new ArrayList<SlaRule>();
+// rules.add(SlaRule.SUCCESS);
+// rules.add(SlaRule.FINISH);
+ List<SlaAction> actions = new ArrayList<SLA.SlaAction>();
+ actions.add(SlaAction.EMAIL);
+ SLA s = new SLA(execId, jobName, checkTime, emails, actions, null, SlaRule.FINISH);
+
+ loader.insertSLA(s);
+
+ List<SLA> allSLAs = loader.loadSLAs();
+ SLA fetchSLA = allSLAs.get(0);
+
+ Assert.assertTrue(allSLAs.size() == 1);
+ // Shouldn't be the same object.
+ Assert.assertTrue(s != fetchSLA);
+ Assert.assertEquals(s.getExecId(), fetchSLA.getExecId());
+ Assert.assertEquals(s.getJobName(), fetchSLA.getJobName());
+ Assert.assertEquals(s.getCheckTime(), fetchSLA.getCheckTime());
+ Assert.assertEquals(s.getEmails(), fetchSLA.getEmails());
+ Assert.assertEquals(s.getRule(), fetchSLA.getRule());
+ Assert.assertEquals(s.getActions().get(0), fetchSLA.getActions().get(0));
+
+
+ loader.removeSLA(s);
+
+ allSLAs = loader.loadSLAs();
+
+ Assert.assertTrue(allSLAs.size() == 0);
+ }
+
+ private SLALoader createLoader() {
+ Props props = new Props();
+ props.put("database.type", "mysql");
+
+ props.put("mysql.host", host);
+ props.put("mysql.port", port);
+ props.put("mysql.user", user);
+ props.put("mysql.database", database);
+ props.put("mysql.password", password);
+ props.put("mysql.numconnections", numConnections);
+
+ return new JdbcSLALoader(props);
+ }
+
+ private boolean isTestSetup() {
+ if (!testDBExists) {
+ System.err.println("Skipping DB test because Db not setup.");
+ return false;
+ }
+
+ System.out.println("Running DB test because Db setup.");
+ return true;
+ }
+
+ public static class CountHandler implements ResultSetHandler<Integer> {
+ @Override
+ public Integer handle(ResultSet rs) throws SQLException {
+ int val = 0;
+ while (rs.next()) {
+ val++;
+ }
+
+ return val;
+ }
+ }
+}
\ No newline at end of file
diff --git a/unit/java/azkaban/test/utils/EmailMessageTest.java b/unit/java/azkaban/test/utils/EmailMessageTest.java
new file mode 100644
index 0000000..43bf216
--- /dev/null
+++ b/unit/java/azkaban/test/utils/EmailMessageTest.java
@@ -0,0 +1,46 @@
+package azkaban.test.utils;
+
+import java.io.IOException;
+
+import javax.mail.MessagingException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.utils.EmailMessage;
+
+public class EmailMessageTest {
+
+ String host = "";
+ String sender = "";
+ String user = "";
+ String password = "";
+
+ String toAddr = "";
+
+ private EmailMessage em;
+ @Before
+ public void setUp() throws Exception {
+ em = new EmailMessage(host, user, password);
+ em.setFromAddress(sender);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testSendEmail() throws IOException {
+ em.addToAddress(toAddr);
+ //em.addToAddress("cyu@linkedin.com");
+ em.setSubject("azkaban test email");
+ em.setBody("azkaban test email");
+ try {
+ em.sendEmail();
+ } catch (MessagingException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+}