azkaban-developers
Changes
src/java/azkaban/migration/scheduler/Schedule.java 363(+363 -0)
src/java/azkaban/migration/sla/SLA.java 252(+252 -0)
Details
diff --git a/src/java/azkaban/migration/schedule2trigger/CommonParams.java b/src/java/azkaban/migration/schedule2trigger/CommonParams.java
new file mode 100644
index 0000000..0408f51
--- /dev/null
+++ b/src/java/azkaban/migration/schedule2trigger/CommonParams.java
@@ -0,0 +1,22 @@
+package azkaban.migration.schedule2trigger;
+
+public class CommonParams {
+ public static final String TYPE_FLOW_FINISH = "FlowFinish";
+ public static final String TYPE_FLOW_SUCCEED = "FlowSucceed";
+ public static final String TYPE_FLOW_PROGRESS = "FlowProgress";
+
+ public static final String TYPE_JOB_FINISH = "JobFinish";
+ public static final String TYPE_JOB_SUCCEED = "JobSucceed";
+ public static final String TYPE_JOB_PROGRESS = "JobProgress";
+
+ public static final String INFO_DURATION = "Duration";
+ public static final String INFO_FLOW_NAME = "FlowName";
+ public static final String INFO_JOB_NAME = "JobName";
+ public static final String INFO_PROGRESS_PERCENT = "ProgressPercent";
+ public static final String INFO_EMAIL_LIST = "EmailList";
+
+ // always alert
+ public static final String ALERT_TYPE = "SlaAlertType";
+ public static final String ACTION_CANCEL_FLOW = "SlaCancelFlow";
+ public static final String ACTION_ALERT = "SlaAlert";
+}
diff --git a/src/java/azkaban/migration/schedule2trigger/Schedule2Trigger.java b/src/java/azkaban/migration/schedule2trigger/Schedule2Trigger.java
new file mode 100644
index 0000000..6d1bd39
--- /dev/null
+++ b/src/java/azkaban/migration/schedule2trigger/Schedule2Trigger.java
@@ -0,0 +1,256 @@
+package azkaban.migration.schedule2trigger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import azkaban.executor.ExecutionOptions;
+import static azkaban.migration.schedule2trigger.CommonParams.*;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.ReadablePeriod;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.JdbcTriggerLoader;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.builtin.BasicTimeChecker;
+import azkaban.trigger.builtin.ExecuteFlowAction;
+import azkaban.utils.Utils;
+
+public class Schedule2Trigger {
+
+ private static final Logger logger = Logger.getLogger(Schedule2Trigger.class);
+ private static Props props;
+ private static File outputDir;
+
+ public static void main(String[] args) throws Exception{
+ if(args.length < 1) {
+ printUsage();
+ }
+
+ File confFile = new File(args[0]);
+ try {
+ logger.info("Trying to load config from " + confFile.getAbsolutePath());
+ props = loadAzkabanConfig(confFile);
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error(e);
+ return;
+ }
+
+ try {
+ outputDir = File.createTempFile("schedules", null);
+ logger.info("Creating temp dir for dumping existing schedules.");
+ outputDir.delete();
+ outputDir.mkdir();
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error(e);
+ return;
+ }
+
+ try {
+ schedule2File();
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error(e);
+ return;
+ }
+
+ try {
+ file2ScheduleTrigger();
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error(e);
+ return;
+ }
+
+ logger.info("Uploaded all schedules. Removing temp dir.");
+ FileUtils.deleteDirectory(outputDir);
+ System.exit(0);
+ }
+
+ private static Props loadAzkabanConfig(File confFile) throws IOException {
+ return new Props(null, confFile);
+ }
+
+ private static void printUsage() {
+ System.out.println("Usage: schedule2Trigger PATH_TO_CONFIG_FILE");
+ }
+
+ private static void schedule2File() throws Exception {
+ azkaban.migration.scheduler.ScheduleLoader scheduleLoader = new azkaban.migration.scheduler.JdbcScheduleLoader(props);
+ logger.info("Loading old schedule info from DB.");
+ List<azkaban.migration.scheduler.Schedule> schedules = scheduleLoader.loadSchedules();
+ for(azkaban.migration.scheduler.Schedule sched : schedules) {
+ writeScheduleFile(sched, outputDir);
+ }
+ }
+
+ private static void writeScheduleFile(azkaban.migration.scheduler.Schedule sched, File outputDir) throws IOException {
+ String scheduleFileName = sched.getProjectName()+"-"+sched.getFlowName();
+ File outputFile = new File(outputDir, scheduleFileName);
+ outputFile.createNewFile();
+ Props props = new Props();
+ props.put("flowName", sched.getFlowName());
+ props.put("projectName", sched.getProjectName());
+ props.put("projectId", String.valueOf(sched.getProjectId()));
+ props.put("period", azkaban.migration.scheduler.Schedule.createPeriodString(sched.getPeriod()));
+ props.put("firstScheduleTimeLong", sched.getFirstSchedTime());
+ props.put("timezone", sched.getTimezone().getID());
+ props.put("submitUser", sched.getSubmitUser());
+ props.put("submitTimeLong", sched.getSubmitTime());
+ props.put("nextExecTimeLong", sched.getNextExecTime());
+
+ ExecutionOptions executionOptions = sched.getExecutionOptions();
+ if(executionOptions != null) {
+ props.put("executionOptionsObj", JSONUtils.toJSON(executionOptions.toObject()));
+ }
+
+ azkaban.migration.sla.SlaOptions slaOptions = sched.getSlaOptions();
+ if(slaOptions != null) {
+
+ List<Map<String, Object>> settingsObj = new ArrayList<Map<String,Object>>();
+ List<azkaban.migration.sla.SLA.SlaSetting> settings = slaOptions.getSettings();
+ for(azkaban.migration.sla.SLA.SlaSetting set : settings) {
+ Map<String, Object> setObj = new HashMap<String, Object>();
+ String setId = set.getId();
+ azkaban.migration.sla.SLA.SlaRule rule = set.getRule();
+ Map<String, Object> info = new HashMap<String, Object>();
+ info.put(INFO_DURATION, azkaban.migration.scheduler.Schedule.createPeriodString(set.getDuration()));
+ info.put(INFO_EMAIL_LIST, slaOptions.getSlaEmails());
+ List<String> actionsList = new ArrayList<String>();
+ for(azkaban.migration.sla.SLA.SlaAction act : set.getActions()) {
+ if(act.equals(azkaban.migration.sla.SLA.SlaAction.EMAIL)) {
+ actionsList.add(ACTION_ALERT);
+ info.put(ALERT_TYPE, "email");
+ } else if(act.equals(azkaban.migration.sla.SLA.SlaAction.KILL)) {
+ actionsList.add(ACTION_CANCEL_FLOW);
+ }
+ }
+ setObj.put("actions", actionsList);
+ if(setId.equals("")) {
+ info.put(INFO_FLOW_NAME, sched.getFlowName());
+ if(rule.equals(azkaban.migration.sla.SLA.SlaRule.FINISH)) {
+ setObj.put("type", TYPE_FLOW_FINISH);
+ } else if(rule.equals(azkaban.migration.sla.SLA.SlaRule.SUCCESS)) {
+ setObj.put("type", TYPE_FLOW_SUCCEED);
+ }
+ } else {
+ info.put(INFO_JOB_NAME, setId);
+ if(rule.equals(azkaban.migration.sla.SLA.SlaRule.FINISH)) {
+ setObj.put("type", TYPE_JOB_FINISH);
+ } else if(rule.equals(azkaban.migration.sla.SLA.SlaRule.SUCCESS)) {
+ setObj.put("type", TYPE_JOB_SUCCEED);
+ }
+ }
+ setObj.put("info", info);
+ settingsObj.add(setObj);
+ }
+
+ props.put("slaOptionsObj", JSONUtils.toJSON(settingsObj));
+ }
+ props.storeLocal(outputFile);
+ }
+
+ private static void file2ScheduleTrigger() throws Exception {
+
+ TriggerLoader triggerLoader = new JdbcTriggerLoader(props);
+ for(File scheduleFile : outputDir.listFiles()) {
+ logger.info("Trying to load schedule from " + scheduleFile.getAbsolutePath());
+ if(scheduleFile.isFile()) {
+ Props schedProps = new Props(null, scheduleFile);
+ String flowName = schedProps.getString("flowName");
+ String projectName = schedProps.getString("projectName");
+ int projectId = schedProps.getInt("projectId");
+ long firstSchedTimeLong = schedProps.getLong("firstScheduleTimeLong");
+// DateTime firstSchedTime = new DateTime(firstSchedTimeLong);
+ String timezoneId = schedProps.getString("timezone");
+ DateTimeZone timezone = DateTimeZone.forID(timezoneId);
+ ReadablePeriod period = Utils.parsePeriodString(schedProps.getString("period"));
+// DateTime lastModifyTime = DateTime.now();
+ long nextExecTimeLong = schedProps.getLong("nextExecTimeLong");
+// DateTime nextExecTime = new DateTime(nextExecTimeLong);
+ long submitTimeLong = schedProps.getLong("submitTimeLong");
+// DateTime submitTime = new DateTime(submitTimeLong);
+ String submitUser = schedProps.getString("submitUser");
+ ExecutionOptions executionOptions = null;
+ if(schedProps.containsKey("executionOptionsObj")) {
+ String executionOptionsObj = schedProps.getString("executionOptionsObj");
+ executionOptions = ExecutionOptions.createFromObject(JSONUtils.parseJSONFromString(executionOptionsObj));
+ } else {
+ executionOptions = new ExecutionOptions();
+ }
+ List<azkaban.sla.SlaOption> slaOptions = null;
+ if(schedProps.containsKey("slaOptionsObj")) {
+ slaOptions = new ArrayList<azkaban.sla.SlaOption>();
+ List<Map<String, Object>> settingsObj = (List<Map<String, Object>>) JSONUtils.parseJSONFromString(schedProps.getString("slaOptionsObj"));
+ for(Map<String, Object> sla : settingsObj) {
+ String type = (String) sla.get("type");
+ Map<String, Object> info = (Map<String, Object>) sla.get("info");
+ List<String> actions = (List<String>) sla.get("actions");
+ azkaban.sla.SlaOption slaOption = new azkaban.sla.SlaOption(type, actions, info);
+ slaOptions.add(slaOption);
+ }
+ }
+
+ azkaban.scheduler.Schedule schedule = new azkaban.scheduler.Schedule(-1, projectId, projectName, flowName, "ready", firstSchedTimeLong, timezone, period, DateTime.now().getMillis(), nextExecTimeLong, submitTimeLong, submitUser, executionOptions, slaOptions);
+ Trigger t = scheduleToTrigger(schedule);
+ logger.info("Ready to insert trigger " + t.getDescription());
+ triggerLoader.addTrigger(t);
+
+ }
+
+ }
+ }
+
+
+ private static Trigger scheduleToTrigger(azkaban.scheduler.Schedule s) {
+
+ Condition triggerCondition = createTimeTriggerCondition(s);
+ Condition expireCondition = createTimeExpireCondition(s);
+ List<TriggerAction> actions = createActions(s);
+ Trigger t = new Trigger(s.getScheduleId(), s.getLastModifyTime(), s.getSubmitTime(), s.getSubmitUser(), azkaban.scheduler.ScheduleManager.triggerSource, triggerCondition, expireCondition, actions);
+ if(s.isRecurring()) {
+ t.setResetOnTrigger(true);
+ }
+ return t;
+ }
+
+ private static List<TriggerAction> createActions (azkaban.scheduler.Schedule s) {
+ List<TriggerAction> actions = new ArrayList<TriggerAction>();
+ ExecuteFlowAction executeAct = new ExecuteFlowAction("executeFlowAction", s.getProjectId(), s.getProjectName(), s.getFlowName(), s.getSubmitUser(), s.getExecutionOptions(), s.getSlaOptions());
+ actions.add(executeAct);
+
+ return actions;
+ }
+
+ private static Condition createTimeTriggerCondition (azkaban.scheduler.Schedule s) {
+ Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+ ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_1", s.getFirstSchedTime(), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
+ checkers.put(checker.getId(), checker);
+ String expr = checker.getId() + ".eval()";
+ Condition cond = new Condition(checkers, expr);
+ return cond;
+ }
+
+ // if failed to trigger, auto expire?
+ private static Condition createTimeExpireCondition (azkaban.scheduler.Schedule s) {
+ Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+ ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_2", s.getFirstSchedTime(), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
+ checkers.put(checker.getId(), checker);
+ String expr = checker.getId() + ".eval()";
+ Condition cond = new Condition(checkers, expr);
+ return cond;
+ }
+
+}
diff --git a/src/java/azkaban/migration/scheduler/JdbcScheduleLoader.java b/src/java/azkaban/migration/scheduler/JdbcScheduleLoader.java
new file mode 100644
index 0000000..bcef168
--- /dev/null
+++ b/src/java/azkaban/migration/scheduler/JdbcScheduleLoader.java
@@ -0,0 +1,364 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.migration.scheduler;
+
+
+import azkaban.database.DataSourceUtils;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import javax.sql.DataSource;
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTimeZone;
+import org.joda.time.ReadablePeriod;
+
+@Deprecated
+public class JdbcScheduleLoader implements ScheduleLoader {
+
+ private static Logger logger = Logger.getLogger(JdbcScheduleLoader.class);
+
+ public static enum EncodingType {
+ PLAIN(1), GZIP(2);
+
+ private int numVal;
+
+ EncodingType(int numVal) {
+ this.numVal = numVal;
+ }
+
+ public int getNumVal() {
+ return numVal;
+ }
+
+ public static EncodingType fromInteger(int x) {
+ switch (x) {
+ case 1:
+ return PLAIN;
+ case 2:
+ return GZIP;
+ default:
+ return PLAIN;
+ }
+ }
+ }
+
+ private DataSource dataSource;
+ private EncodingType defaultEncodingType = EncodingType.GZIP;
+
+ private static final String scheduleTableName = "schedules";
+
+ private static String SELECT_ALL_SCHEDULES =
+ "SELECT project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user, enc_type, schedule_options FROM " + scheduleTableName;
+
+ private static String INSERT_SCHEDULE =
+ "INSERT INTO " + scheduleTableName + " ( project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user, enc_type, schedule_options) values (?,?,?,?,?,?,?,?,?,?,?,?,?)";
+
+ private static String REMOVE_SCHEDULE_BY_KEY =
+ "DELETE FROM " + scheduleTableName + " WHERE project_id=? AND flow_name=?";
+
+ private static String UPDATE_SCHEDULE_BY_KEY =
+ "UPDATE " + scheduleTableName + " SET status=?, first_sched_time=?, timezone=?, period=?, last_modify_time=?, next_exec_time=?, submit_time=?, submit_user=?, enc_type=?, schedule_options=? WHERE project_id=? AND flow_name=?";
+
+ private static String UPDATE_NEXT_EXEC_TIME =
+ "UPDATE " + scheduleTableName + " SET next_exec_time=? WHERE project_id=? AND flow_name=?";
+
+ private Connection getConnection() throws ScheduleManagerException {
+ Connection connection = null;
+ try {
+ connection = dataSource.getConnection();
+ } catch (Exception e) {
+ DbUtils.closeQuietly(connection);
+ throw new ScheduleManagerException("Error getting DB connection.", e);
+ }
+
+ return connection;
+ }
+
+ public EncodingType getDefaultEncodingType() {
+ return defaultEncodingType;
+ }
+
+ public void setDefaultEncodingType(EncodingType defaultEncodingType) {
+ this.defaultEncodingType = defaultEncodingType;
+ }
+
+ public JdbcScheduleLoader(Props props) {
+ String databaseType = props.getString("database.type");
+
+ if (databaseType.equals("mysql")) {
+ int port = props.getInt("mysql.port");
+ String host = props.getString("mysql.host");
+ String database = props.getString("mysql.database");
+ String user = props.getString("mysql.user");
+ String password = props.getString("mysql.password");
+ int numConnections = props.getInt("mysql.numconnections");
+
+ dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
+ }
+ }
+
+ @Override
+ public List<Schedule> loadSchedules() throws ScheduleManagerException {
+ logger.info("Loading all schedules from db.");
+ Connection connection = getConnection();
+
+ QueryRunner runner = new QueryRunner();
+ ResultSetHandler<List<Schedule>> handler = new ScheduleResultHandler();
+
+ List<Schedule> schedules;
+
+ try {
+ schedules = runner.query(connection, SELECT_ALL_SCHEDULES, handler);
+ } catch (SQLException e) {
+ logger.error(SELECT_ALL_SCHEDULES + " failed.");
+
+ DbUtils.closeQuietly(connection);
+ throw new ScheduleManagerException("Loading schedules from db failed. ", e);
+ } finally {
+ DbUtils.closeQuietly(connection);
+ }
+
+ logger.info("Now trying to update the schedules");
+
+ // filter the schedules
+ Iterator<Schedule> scheduleIterator = schedules.iterator();
+ while (scheduleIterator.hasNext()) {
+ Schedule sched = scheduleIterator.next();
+ if(!sched.updateTime()) {
+ logger.info("Schedule " + sched.getScheduleName() + " was scheduled before azkaban start, skipping it.");
+ scheduleIterator.remove();
+ removeSchedule(sched);
+ }
+ else {
+ logger.info("Recurring schedule, need to update next exec time");
+ try {
+ updateNextExecTime(sched);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new ScheduleManagerException("Update next execution time failed.", e);
+ }
+ logger.info("Schedule " + sched.getScheduleName() + " loaded and updated.");
+ }
+ }
+
+
+
+ logger.info("Loaded " + schedules.size() + " schedules.");
+
+ return schedules;
+ }
+
+ @Override
+ public void removeSchedule(Schedule s) throws ScheduleManagerException {
+ logger.info("Removing schedule " + s.getScheduleName() + " from db.");
+
+ QueryRunner runner = new QueryRunner(dataSource);
+
+ try {
+ int removes = runner.update(REMOVE_SCHEDULE_BY_KEY, s.getProjectId(), s.getFlowName());
+ if (removes == 0) {
+ throw new ScheduleManagerException("No schedule has been removed.");
+ }
+ } catch (SQLException e) {
+ logger.error(REMOVE_SCHEDULE_BY_KEY + " failed.");
+ throw new ScheduleManagerException("Remove schedule " + s.getScheduleName() + " from db failed. ", e);
+ }
+ }
+
+
+ public void insertSchedule(Schedule s) throws ScheduleManagerException {
+ logger.info("Inserting schedule " + s.getScheduleName() + " into db.");
+ insertSchedule(s, defaultEncodingType);
+ }
+
+ public void insertSchedule(Schedule s, EncodingType encType) throws ScheduleManagerException {
+
+ String json = JSONUtils.toJSON(s.optionsToObject());
+ byte[] data = null;
+ try {
+ byte[] stringData = json.getBytes("UTF-8");
+ data = stringData;
+
+ if (encType == EncodingType.GZIP) {
+ data = GZIPUtils.gzipBytes(stringData);
+ }
+ logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+ }
+ catch (IOException e) {
+ throw new ScheduleManagerException("Error encoding the schedule options. " + s.getScheduleName());
+ }
+
+ QueryRunner runner = new QueryRunner(dataSource);
+ try {
+ int inserts = runner.update(
+ INSERT_SCHEDULE,
+ s.getProjectId(),
+ s.getProjectName(),
+ s.getFlowName(),
+ s.getStatus(),
+ s.getFirstSchedTime(),
+ s.getTimezone().getID(),
+ Schedule.createPeriodString(s.getPeriod()),
+ s.getLastModifyTime(),
+ s.getNextExecTime(),
+ s.getSubmitTime(),
+ s.getSubmitUser(),
+ encType.getNumVal(),
+ data);
+ if (inserts == 0) {
+ throw new ScheduleManagerException("No schedule has been inserted.");
+ }
+ } catch (SQLException e) {
+ logger.error(INSERT_SCHEDULE + " failed.");
+ throw new ScheduleManagerException("Insert schedule " + s.getScheduleName() + " into db failed. ", e);
+ }
+ }
+
+ @Override
+ public void updateNextExecTime(Schedule s) throws ScheduleManagerException
+ {
+ logger.info("Update schedule " + s.getScheduleName() + " into db. ");
+ Connection connection = getConnection();
+ QueryRunner runner = new QueryRunner();
+ try {
+
+ runner.update(connection, UPDATE_NEXT_EXEC_TIME, s.getNextExecTime(), s.getProjectId(), s.getFlowName());
+ } catch (SQLException e) {
+ e.printStackTrace();
+ logger.error(UPDATE_NEXT_EXEC_TIME + " failed.", e);
+ throw new ScheduleManagerException("Update schedule " + s.getScheduleName() + " into db failed. ", e);
+ } finally {
+ DbUtils.closeQuietly(connection);
+ }
+ }
+
+ @Override
+ public void updateSchedule(Schedule s) throws ScheduleManagerException {
+ logger.info("Updating schedule " + s.getScheduleName() + " into db.");
+ updateSchedule(s, defaultEncodingType);
+ }
+
+ public void updateSchedule(Schedule s, EncodingType encType) throws ScheduleManagerException {
+
+ String json = JSONUtils.toJSON(s.optionsToObject());
+ byte[] data = null;
+ try {
+ byte[] stringData = json.getBytes("UTF-8");
+ data = stringData;
+
+ if (encType == EncodingType.GZIP) {
+ data = GZIPUtils.gzipBytes(stringData);
+ }
+ logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+ }
+ catch (IOException e) {
+ throw new ScheduleManagerException("Error encoding the schedule options " + s.getScheduleName());
+ }
+
+ QueryRunner runner = new QueryRunner(dataSource);
+
+ try {
+ int updates = runner.update(
+ UPDATE_SCHEDULE_BY_KEY,
+ s.getStatus(),
+ s.getFirstSchedTime(),
+ s.getTimezone().getID(),
+ Schedule.createPeriodString(s.getPeriod()),
+ s.getLastModifyTime(),
+ s.getNextExecTime(),
+ s.getSubmitTime(),
+ s.getSubmitUser(),
+ encType.getNumVal(),
+ data,
+ s.getProjectId(),
+ s.getFlowName());
+ if (updates == 0) {
+ throw new ScheduleManagerException("No schedule has been updated.");
+ }
+ } catch (SQLException e) {
+ logger.error(UPDATE_SCHEDULE_BY_KEY + " failed.");
+ throw new ScheduleManagerException("Update schedule " + s.getScheduleName() + " into db failed. ", e);
+ }
+ }
+
+ public class ScheduleResultHandler implements ResultSetHandler<List<Schedule>> {
+ @Override
+ public List<Schedule> handle(ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.<Schedule>emptyList();
+ }
+
+ ArrayList<Schedule> schedules = new ArrayList<Schedule>();
+ do {
+ int projectId = rs.getInt(1);
+ String projectName = rs.getString(2);
+ String flowName = rs.getString(3);
+ String status = rs.getString(4);
+ long firstSchedTime = rs.getLong(5);
+ DateTimeZone timezone = DateTimeZone.forID(rs.getString(6));
+ ReadablePeriod period = Schedule.parsePeriodString(rs.getString(7));
+ long lastModifyTime = rs.getLong(8);
+ long nextExecTime = rs.getLong(9);
+ long submitTime = rs.getLong(10);
+ String submitUser = rs.getString(11);
+ int encodingType = rs.getInt(12);
+ byte[] data = rs.getBytes(13);
+
+ Object optsObj = null;
+ if (data != null) {
+ EncodingType encType = EncodingType.fromInteger(encodingType);
+
+ try {
+ // Convoluted way to inflate strings. Should find common package or helper function.
+ if (encType == EncodingType.GZIP) {
+ // Decompress the sucker.
+ String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+ optsObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+ else {
+ String jsonString = new String(data, "UTF-8");
+ optsObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+ } catch (IOException e) {
+ throw new SQLException("Error reconstructing schedule options " + projectName + "." + flowName);
+ }
+ }
+
+ Schedule s = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser);
+ if (optsObj != null) {
+ s.createAndSetScheduleOptions(optsObj);
+ }
+
+ schedules.add(s);
+ } while (rs.next());
+
+ return schedules;
+ }
+
+ }
+}
\ No newline at end of file
src/java/azkaban/migration/scheduler/Schedule.java 363(+363 -0)
diff --git a/src/java/azkaban/migration/scheduler/Schedule.java b/src/java/azkaban/migration/scheduler/Schedule.java
new file mode 100644
index 0000000..9490243
--- /dev/null
+++ b/src/java/azkaban/migration/scheduler/Schedule.java
@@ -0,0 +1,363 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.migration.scheduler;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
+import org.joda.time.DurationFieldType;
+import org.joda.time.Hours;
+import org.joda.time.Minutes;
+import org.joda.time.Months;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.Seconds;
+import org.joda.time.Weeks;
+
+import azkaban.executor.ExecutionOptions;
+import azkaban.migration.sla.SlaOptions;
+import azkaban.utils.Pair;
+
+@Deprecated
+public class Schedule{
+
+// private long projectGuid;
+// private long flowGuid;
+
+// private String scheduleId;
+
+ private int projectId;
+ private String projectName;
+ private String flowName;
+ private long firstSchedTime;
+ private DateTimeZone timezone;
+ private long lastModifyTime;
+ private ReadablePeriod period;
+ private long nextExecTime;
+ private String submitUser;
+ private String status;
+ private long submitTime;
+
+ private ExecutionOptions executionOptions;
+ private SlaOptions slaOptions;
+
+ public Schedule(
+ int projectId,
+ String projectName,
+ String flowName,
+ String status,
+ long firstSchedTime,
+ DateTimeZone timezone,
+ ReadablePeriod period,
+ long lastModifyTime,
+ long nextExecTime,
+ long submitTime,
+ String submitUser
+ ) {
+ this.projectId = projectId;
+ this.projectName = projectName;
+ this.flowName = flowName;
+ this.firstSchedTime = firstSchedTime;
+ this.timezone = timezone;
+ this.lastModifyTime = lastModifyTime;
+ this.period = period;
+ this.nextExecTime = nextExecTime;
+ this.submitUser = submitUser;
+ this.status = status;
+ this.submitTime = submitTime;
+ this.executionOptions = null;
+ this.slaOptions = null;
+ }
+
+ public Schedule(
+ int projectId,
+ String projectName,
+ String flowName,
+ String status,
+ long firstSchedTime,
+ String timezoneId,
+ String period,
+ long lastModifyTime,
+ long nextExecTime,
+ long submitTime,
+ String submitUser,
+ ExecutionOptions executionOptions,
+ SlaOptions slaOptions
+ ) {
+ this.projectId = projectId;
+ this.projectName = projectName;
+ this.flowName = flowName;
+ this.firstSchedTime = firstSchedTime;
+ this.timezone = DateTimeZone.forID(timezoneId);
+ this.lastModifyTime = lastModifyTime;
+ this.period = parsePeriodString(period);
+ this.nextExecTime = nextExecTime;
+ this.submitUser = submitUser;
+ this.status = status;
+ this.submitTime = submitTime;
+ this.executionOptions = executionOptions;
+ this.slaOptions = slaOptions;
+ }
+
+ public Schedule(
+ int projectId,
+ String projectName,
+ String flowName,
+ String status,
+ long firstSchedTime,
+ DateTimeZone timezone,
+ ReadablePeriod period,
+ long lastModifyTime,
+ long nextExecTime,
+ long submitTime,
+ String submitUser,
+ ExecutionOptions executionOptions,
+ SlaOptions slaOptions
+ ) {
+ this.projectId = projectId;
+ this.projectName = projectName;
+ this.flowName = flowName;
+ this.firstSchedTime = firstSchedTime;
+ this.timezone = timezone;
+ this.lastModifyTime = lastModifyTime;
+ this.period = period;
+ this.nextExecTime = nextExecTime;
+ this.submitUser = submitUser;
+ this.status = status;
+ this.submitTime = submitTime;
+ this.executionOptions = executionOptions;
+ this.slaOptions = slaOptions;
+ }
+
+ public ExecutionOptions getExecutionOptions() {
+ return executionOptions;
+ }
+
+ public void setFlowOptions(ExecutionOptions executionOptions) {
+ this.executionOptions = executionOptions;
+ }
+
+ public SlaOptions getSlaOptions() {
+ return slaOptions;
+ }
+
+ public void setSlaOptions(SlaOptions slaOptions) {
+ this.slaOptions = slaOptions;
+ }
+
+ public String getScheduleName() {
+ return projectName + "." + flowName + " (" + projectId + ")";
+ }
+
+ public String toString() {
+ return projectName + "." + flowName + " (" + projectId + ")" + " to be run at (starting) " +
+ new DateTime(firstSchedTime).toDateTimeISO() + " with recurring period of " + (period == null ? "non-recurring" : createPeriodString(period));
+ }
+
+ public Pair<Integer, String> getScheduleId() {
+ return new Pair<Integer, String>(getProjectId(), getFlowName());
+ }
+
+ public int getProjectId() {
+ return projectId;
+ }
+
+ public String getProjectName() {
+ return projectName;
+ }
+
+ public String getFlowName() {
+ return flowName;
+ }
+
+ public long getFirstSchedTime() {
+ return firstSchedTime;
+ }
+
+ public DateTimeZone getTimezone() {
+ return timezone;
+ }
+
+ public long getLastModifyTime() {
+ return lastModifyTime;
+ }
+
+ public ReadablePeriod getPeriod() {
+ return period;
+ }
+
+ public long getNextExecTime() {
+ return nextExecTime;
+ }
+
+ public String getSubmitUser() {
+ return submitUser;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public long getSubmitTime() {
+ return submitTime;
+ }
+
+ public boolean updateTime() {
+ if (new DateTime(nextExecTime).isAfterNow()) {
+ return true;
+ }
+
+ if (period != null) {
+ DateTime nextTime = getNextRuntime(nextExecTime, timezone, period);
+
+ this.nextExecTime = nextTime.getMillis();
+ return true;
+ }
+
+ return false;
+ }
+
+ private DateTime getNextRuntime(long scheduleTime, DateTimeZone timezone, ReadablePeriod period) {
+ DateTime now = new DateTime();
+ DateTime date = new DateTime(scheduleTime).withZone(timezone);
+ int count = 0;
+ while (!now.isBefore(date)) {
+ if (count > 100000) {
+ throw new IllegalStateException(
+ "100000 increments of period did not get to present time.");
+ }
+
+ if (period == null) {
+ break;
+ } else {
+ date = date.plus(period);
+ }
+
+ count += 1;
+ }
+
+ return date;
+ }
+
+ public static ReadablePeriod parsePeriodString(String periodStr) {
+ ReadablePeriod period;
+ char periodUnit = periodStr.charAt(periodStr.length() - 1);
+ if (periodUnit == 'n') {
+ return null;
+ }
+
+ int periodInt = Integer.parseInt(periodStr.substring(0,
+ periodStr.length() - 1));
+ switch (periodUnit) {
+ case 'M':
+ period = Months.months(periodInt);
+ break;
+ case 'w':
+ period = Weeks.weeks(periodInt);
+ break;
+ case 'd':
+ period = Days.days(periodInt);
+ break;
+ case 'h':
+ period = Hours.hours(periodInt);
+ break;
+ case 'm':
+ period = Minutes.minutes(periodInt);
+ break;
+ case 's':
+ period = Seconds.seconds(periodInt);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid schedule period unit '"
+ + periodUnit);
+ }
+
+ return period;
+ }
+
+ public static String createPeriodString(ReadablePeriod period) {
+ String periodStr = "n";
+
+ if (period == null) {
+ return "n";
+ }
+
+ if (period.get(DurationFieldType.months()) > 0) {
+ int months = period.get(DurationFieldType.months());
+ periodStr = months + "M";
+ } else if (period.get(DurationFieldType.weeks()) > 0) {
+ int weeks = period.get(DurationFieldType.weeks());
+ periodStr = weeks + "w";
+ } else if (period.get(DurationFieldType.days()) > 0) {
+ int days = period.get(DurationFieldType.days());
+ periodStr = days + "d";
+ } else if (period.get(DurationFieldType.hours()) > 0) {
+ int hours = period.get(DurationFieldType.hours());
+ periodStr = hours + "h";
+ } else if (period.get(DurationFieldType.minutes()) > 0) {
+ int minutes = period.get(DurationFieldType.minutes());
+ periodStr = minutes + "m";
+ } else if (period.get(DurationFieldType.seconds()) > 0) {
+ int seconds = period.get(DurationFieldType.seconds());
+ periodStr = seconds + "s";
+ }
+
+ return periodStr;
+ }
+
+
+ public Map<String,Object> optionsToObject() {
+ if(executionOptions != null || slaOptions != null) {
+ HashMap<String, Object> schedObj = new HashMap<String, Object>();
+
+ if(executionOptions != null) {
+ schedObj.put("executionOptions", executionOptions.toObject());
+ }
+ if(slaOptions != null) {
+ schedObj.put("slaOptions", slaOptions.toObject());
+ }
+
+ return schedObj;
+ }
+ return null;
+ }
+
+ public void createAndSetScheduleOptions(Object obj) {
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> schedObj = (HashMap<String, Object>)obj;
+ if (schedObj.containsKey("executionOptions")) {
+ ExecutionOptions execOptions = ExecutionOptions.createFromObject(schedObj.get("executionOptions"));
+ this.executionOptions = execOptions;
+ }
+ else if (schedObj.containsKey("flowOptions")){
+ ExecutionOptions execOptions = ExecutionOptions.createFromObject(schedObj.get("flowOptions"));
+ this.executionOptions = execOptions;
+ execOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
+ }
+ else {
+ this.executionOptions = new ExecutionOptions();
+ this.executionOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
+ }
+
+ if (schedObj.containsKey("slaOptions")) {
+ SlaOptions slaOptions = SlaOptions.fromObject(schedObj.get("slaOptions"));
+ this.slaOptions = slaOptions;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/java/azkaban/migration/scheduler/ScheduleLoader.java b/src/java/azkaban/migration/scheduler/ScheduleLoader.java
new file mode 100644
index 0000000..6511d9c
--- /dev/null
+++ b/src/java/azkaban/migration/scheduler/ScheduleLoader.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.migration.scheduler;
+
+import java.util.List;
+
+@Deprecated
+public interface ScheduleLoader {
+
+ public void insertSchedule(Schedule s) throws ScheduleManagerException;
+
+ public void updateSchedule(Schedule s) throws ScheduleManagerException;
+
+ public List<Schedule> loadSchedules() throws ScheduleManagerException;
+
+ public void removeSchedule(Schedule s) throws ScheduleManagerException;
+
+ public void updateNextExecTime(Schedule s) throws ScheduleManagerException;
+
+}
\ No newline at end of file
diff --git a/src/java/azkaban/migration/scheduler/ScheduleManagerException.java b/src/java/azkaban/migration/scheduler/ScheduleManagerException.java
new file mode 100644
index 0000000..f0f6705
--- /dev/null
+++ b/src/java/azkaban/migration/scheduler/ScheduleManagerException.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.migration.scheduler;
+
+@Deprecated
+public class ScheduleManagerException extends Exception{
+ private static final long serialVersionUID = 1L;
+
+ public ScheduleManagerException(String message) {
+ super(message);
+ }
+
+ public ScheduleManagerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
\ No newline at end of file
src/java/azkaban/migration/sla/SLA.java 252(+252 -0)
diff --git a/src/java/azkaban/migration/sla/SLA.java b/src/java/azkaban/migration/sla/SLA.java
new file mode 100644
index 0000000..0e75965
--- /dev/null
+++ b/src/java/azkaban/migration/sla/SLA.java
@@ -0,0 +1,252 @@
+package azkaban.migration.sla;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.joda.time.ReadablePeriod;
+
+import azkaban.migration.scheduler.Schedule;
+
+@Deprecated
+public class SLA {
+
+ public static enum SlaRule {
+ SUCCESS(1), FINISH(2), WAITANDCHECKJOB(3);
+
+ private int numVal;
+
+ SlaRule(int numVal) {
+ this.numVal = numVal;
+ }
+
+ public int getNumVal() {
+ return numVal;
+ }
+
+ public static SlaRule fromInteger(int x) {
+ switch (x) {
+ case 1:
+ return SUCCESS;
+ case 2:
+ return FINISH;
+ case 3:
+ return WAITANDCHECKJOB;
+ default:
+ return SUCCESS;
+ }
+ }
+ }
+
+ public static enum SlaAction {
+ EMAIL(1), KILL(2);
+
+ private int numVal;
+
+ SlaAction(int numVal) {
+ this.numVal = numVal;
+ }
+
+ public int getNumVal() {
+ return numVal;
+ }
+
+ public static SlaAction fromInteger(int x) {
+ switch (x) {
+ case 1:
+ return EMAIL;
+ case 2:
+ return KILL;
+ default:
+ return EMAIL;
+ }
+ }
+ }
+
+ public static class SlaSetting {
+ public String getId() {
+ return id;
+ }
+ public void setId(String id) {
+ this.id = id;
+ }
+ public ReadablePeriod getDuration() {
+ return duration;
+ }
+ public void setDuration(ReadablePeriod duration) {
+ this.duration = duration;
+ }
+ public SlaRule getRule() {
+ return rule;
+ }
+ public void setRule(SlaRule rule) {
+ this.rule = rule;
+ }
+ public List<SlaAction> getActions() {
+ return actions;
+ }
+ public void setActions(List<SlaAction> actions) {
+ this.actions = actions;
+ }
+
+ public Object toObject() {
+ Map<String, Object> obj = new HashMap<String, Object>();
+ obj.put("id", id);
+ obj.put("duration", Schedule.createPeriodString(duration));
+// List<String> rulesObj = new ArrayList<String>();
+// for(SlaRule rule : rules) {
+// rulesObj.add(rule.toString());
+// }
+// obj.put("rules", rulesObj);
+ obj.put("rule", rule.toString());
+ List<String> actionsObj = new ArrayList<String>();
+ for(SlaAction act : actions) {
+ actionsObj.add(act.toString());
+ }
+ obj.put("actions", actionsObj);
+ return obj;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static SlaSetting fromObject(Object obj) {
+ Map<String, Object> slaObj = (HashMap<String, Object>) obj;
+ String subId = (String) slaObj.get("id");
+ ReadablePeriod dur = Schedule.parsePeriodString((String) slaObj.get("duration"));
+// List<String> rulesObj = (ArrayList<String>) slaObj.get("rules");
+// List<SlaRule> slaRules = new ArrayList<SLA.SlaRule>();
+// for(String rule : rulesObj) {
+// slaRules.add(SlaRule.valueOf(rule));
+// }
+ SlaRule slaRule = SlaRule.valueOf((String) slaObj.get("rule"));
+ List<String> actsObj = (ArrayList<String>) slaObj.get("actions");
+ List<SlaAction> slaActs = new ArrayList<SlaAction>();
+ for(String act : actsObj) {
+ slaActs.add(SlaAction.valueOf(act));
+ }
+
+ SlaSetting ret = new SlaSetting();
+ ret.setId(subId);
+ ret.setDuration(dur);
+ ret.setRule(slaRule);
+ ret.setActions(slaActs);
+ return ret;
+ }
+
+ private String id;
+ private ReadablePeriod duration;
+ private SlaRule rule = SlaRule.SUCCESS;
+ private List<SlaAction> actions;
+ }
+
+ private int execId;
+ private String jobName;
+ private DateTime checkTime;
+ private List<String> emails;
+ private List<SlaAction> actions;
+ private List<SlaSetting> jobSettings;
+ private SlaRule rule;
+
+ public SLA(
+ int execId,
+ String jobName,
+ DateTime checkTime,
+ List<String> emails,
+ List<SlaAction> slaActions,
+ List<SlaSetting> jobSettings,
+ SlaRule slaRule
+ ) {
+ this.execId = execId;
+ this.jobName = jobName;
+ this.checkTime = checkTime;
+ this.emails = emails;
+ this.actions = slaActions;
+ this.jobSettings = jobSettings;
+ this.rule = slaRule;
+ }
+
+ public int getExecId() {
+ return execId;
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ public DateTime getCheckTime() {
+ return checkTime;
+ }
+
+ public List<String> getEmails() {
+ return emails;
+ }
+
+ public List<SlaAction> getActions() {
+ return actions;
+ }
+
+ public List<SlaSetting> getJobSettings() {
+ return jobSettings;
+ }
+
+ public SlaRule getRule() {
+ return rule;
+ }
+
+ public String toString() {
+ return execId + " " + jobName + " to be checked at " + checkTime.toDateTimeISO();
+ }
+
+ public Map<String,Object> optionToObject() {
+ HashMap<String, Object> slaObj = new HashMap<String, Object>();
+
+ slaObj.put("emails", emails);
+// slaObj.put("rule", rule.toString());
+
+ List<String> actionsObj = new ArrayList<String>();
+ for(SlaAction act : actions) {
+ actionsObj.add(act.toString());
+ }
+ slaObj.put("actions", actionsObj);
+
+ if(jobSettings != null && jobSettings.size() > 0) {
+ List<Object> settingsObj = new ArrayList<Object>();
+ for(SlaSetting set : jobSettings) {
+ settingsObj.add(set.toObject());
+ }
+ slaObj.put("jobSettings", settingsObj);
+ }
+
+ return slaObj;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static SLA createSlaFromObject(int execId, String jobName, DateTime checkTime, SlaRule rule, Object obj) {
+
+ HashMap<String, Object> slaObj = (HashMap<String,Object>)obj;
+
+ List<String> emails = (List<String>)slaObj.get("emails");
+// SlaRule rule = SlaRule.valueOf((String)slaObj.get("rule"));
+ List<String> actsObj = (ArrayList<String>) slaObj.get("actions");
+ List<SlaAction> slaActs = new ArrayList<SlaAction>();
+ for(String act : actsObj) {
+ slaActs.add(SlaAction.valueOf(act));
+ }
+ List<SlaSetting> jobSets = null;
+ if(slaObj.containsKey("jobSettings") && slaObj.get("jobSettings") != null) {
+ jobSets = new ArrayList<SLA.SlaSetting>();
+ for(Object set : (List<Object>)slaObj.get("jobSettings")) {
+ SlaSetting jobSet = SlaSetting.fromObject(set);
+ jobSets.add(jobSet);
+ }
+ }
+
+ return new SLA(execId, jobName, checkTime, emails, slaActs, jobSets, rule);
+ }
+
+ public void setCheckTime(DateTime time) {
+ this.checkTime = time;
+ }
+
+}
diff --git a/src/java/azkaban/migration/sla/SlaOptions.java b/src/java/azkaban/migration/sla/SlaOptions.java
new file mode 100644
index 0000000..f7b9d49
--- /dev/null
+++ b/src/java/azkaban/migration/sla/SlaOptions.java
@@ -0,0 +1,52 @@
+package azkaban.migration.sla;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import azkaban.migration.sla.SLA.SlaSetting;
+
+@Deprecated
+public class SlaOptions {
+
+ public List<String> getSlaEmails() {
+ return slaEmails;
+ }
+ public void setSlaEmails(List<String> slaEmails) {
+ this.slaEmails = slaEmails;
+ }
+ public List<SlaSetting> getSettings() {
+ return settings;
+ }
+ public void setSettings(List<SlaSetting> settings) {
+ this.settings = settings;
+ }
+ private List<String> slaEmails;
+ private List<SlaSetting> settings;
+ public Object toObject() {
+ Map<String, Object> obj = new HashMap<String, Object>();
+ obj.put("slaEmails", slaEmails);
+ List<Object> slaSettings = new ArrayList<Object>();
+ for(SlaSetting s : settings) {
+ slaSettings.add(s.toObject());
+ }
+ obj.put("settings", slaSettings);
+ return obj;
+ }
+ @SuppressWarnings("unchecked")
+ public static SlaOptions fromObject(Object object) {
+ if(object != null) {
+ SlaOptions slaOptions = new SlaOptions();
+ Map<String, Object> obj = (HashMap<String, Object>) object;
+ slaOptions.setSlaEmails((List<String>) obj.get("slaEmails"));
+ List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
+ for(Object set: (List<Object>)obj.get("settings")) {
+ slaSets.add(SlaSetting.fromObject(set));
+ }
+ slaOptions.setSettings(slaSets);
+ return slaOptions;
+ }
+ return null;
+ }
+}
\ No newline at end of file
diff --git a/src/package/webserver/bin/schedule2trigger.sh b/src/package/webserver/bin/schedule2trigger.sh
new file mode 100644
index 0000000..1178cf3
--- /dev/null
+++ b/src/package/webserver/bin/schedule2trigger.sh
@@ -0,0 +1,4 @@
+#!/bin/bash
+
+java -cp "lib/*:extlib/*" azkaban.migration.schedule2trigger.Schedule2Trigger conf/azkaban.properties
+