azkaban-memoizeit
Changes
src/java/azkaban/migration/scheduler/Schedule.java 363(+363 -0)
src/java/azkaban/migration/sla/SLA.java 252(+252 -0)
src/web/js/azkaban.jobdetails.view.js 359(+147 -212)
src/web/js/azkaban.logdata.model.js 302(+302 -0)
Details
diff --git a/src/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java b/src/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
index 54179fd..2e5de64 100644
--- a/src/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
+++ b/src/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
@@ -162,6 +162,13 @@ public class AzkabanProcess {
public void hardKill() {
checkStarted();
if (isRunning()) {
+ if (processId != 0 ) {
+ try {
+ Runtime.getRuntime().exec("kill -9 " + processId);
+ } catch (IOException e) {
+ logger.error("Kill attempt failed.", e);
+ }
+ }
process.destroy();
}
}
diff --git a/src/java/azkaban/migration/schedule2trigger/CommonParams.java b/src/java/azkaban/migration/schedule2trigger/CommonParams.java
new file mode 100644
index 0000000..0408f51
--- /dev/null
+++ b/src/java/azkaban/migration/schedule2trigger/CommonParams.java
@@ -0,0 +1,22 @@
+package azkaban.migration.schedule2trigger;
+
+public class CommonParams {
+ public static final String TYPE_FLOW_FINISH = "FlowFinish";
+ public static final String TYPE_FLOW_SUCCEED = "FlowSucceed";
+ public static final String TYPE_FLOW_PROGRESS = "FlowProgress";
+
+ public static final String TYPE_JOB_FINISH = "JobFinish";
+ public static final String TYPE_JOB_SUCCEED = "JobSucceed";
+ public static final String TYPE_JOB_PROGRESS = "JobProgress";
+
+ public static final String INFO_DURATION = "Duration";
+ public static final String INFO_FLOW_NAME = "FlowName";
+ public static final String INFO_JOB_NAME = "JobName";
+ public static final String INFO_PROGRESS_PERCENT = "ProgressPercent";
+ public static final String INFO_EMAIL_LIST = "EmailList";
+
+ // always alert
+ public static final String ALERT_TYPE = "SlaAlertType";
+ public static final String ACTION_CANCEL_FLOW = "SlaCancelFlow";
+ public static final String ACTION_ALERT = "SlaAlert";
+}
diff --git a/src/java/azkaban/migration/schedule2trigger/Schedule2Trigger.java b/src/java/azkaban/migration/schedule2trigger/Schedule2Trigger.java
new file mode 100644
index 0000000..6d1bd39
--- /dev/null
+++ b/src/java/azkaban/migration/schedule2trigger/Schedule2Trigger.java
@@ -0,0 +1,256 @@
+package azkaban.migration.schedule2trigger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import azkaban.executor.ExecutionOptions;
+import static azkaban.migration.schedule2trigger.CommonParams.*;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.ReadablePeriod;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.JdbcTriggerLoader;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.builtin.BasicTimeChecker;
+import azkaban.trigger.builtin.ExecuteFlowAction;
+import azkaban.utils.Utils;
+
+public class Schedule2Trigger {
+
+ private static final Logger logger = Logger.getLogger(Schedule2Trigger.class);
+ private static Props props;
+ private static File outputDir;
+
+ public static void main(String[] args) throws Exception{
+ if(args.length < 1) {
+ printUsage();
+ }
+
+ File confFile = new File(args[0]);
+ try {
+ logger.info("Trying to load config from " + confFile.getAbsolutePath());
+ props = loadAzkabanConfig(confFile);
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error(e);
+ return;
+ }
+
+ try {
+ outputDir = File.createTempFile("schedules", null);
+ logger.info("Creating temp dir for dumping existing schedules.");
+ outputDir.delete();
+ outputDir.mkdir();
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error(e);
+ return;
+ }
+
+ try {
+ schedule2File();
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error(e);
+ return;
+ }
+
+ try {
+ file2ScheduleTrigger();
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error(e);
+ return;
+ }
+
+ logger.info("Uploaded all schedules. Removing temp dir.");
+ FileUtils.deleteDirectory(outputDir);
+ System.exit(0);
+ }
+
+ private static Props loadAzkabanConfig(File confFile) throws IOException {
+ return new Props(null, confFile);
+ }
+
+ private static void printUsage() {
+ System.out.println("Usage: schedule2Trigger PATH_TO_CONFIG_FILE");
+ }
+
+ private static void schedule2File() throws Exception {
+ azkaban.migration.scheduler.ScheduleLoader scheduleLoader = new azkaban.migration.scheduler.JdbcScheduleLoader(props);
+ logger.info("Loading old schedule info from DB.");
+ List<azkaban.migration.scheduler.Schedule> schedules = scheduleLoader.loadSchedules();
+ for(azkaban.migration.scheduler.Schedule sched : schedules) {
+ writeScheduleFile(sched, outputDir);
+ }
+ }
+
+ private static void writeScheduleFile(azkaban.migration.scheduler.Schedule sched, File outputDir) throws IOException {
+ String scheduleFileName = sched.getProjectName()+"-"+sched.getFlowName();
+ File outputFile = new File(outputDir, scheduleFileName);
+ outputFile.createNewFile();
+ Props props = new Props();
+ props.put("flowName", sched.getFlowName());
+ props.put("projectName", sched.getProjectName());
+ props.put("projectId", String.valueOf(sched.getProjectId()));
+ props.put("period", azkaban.migration.scheduler.Schedule.createPeriodString(sched.getPeriod()));
+ props.put("firstScheduleTimeLong", sched.getFirstSchedTime());
+ props.put("timezone", sched.getTimezone().getID());
+ props.put("submitUser", sched.getSubmitUser());
+ props.put("submitTimeLong", sched.getSubmitTime());
+ props.put("nextExecTimeLong", sched.getNextExecTime());
+
+ ExecutionOptions executionOptions = sched.getExecutionOptions();
+ if(executionOptions != null) {
+ props.put("executionOptionsObj", JSONUtils.toJSON(executionOptions.toObject()));
+ }
+
+ azkaban.migration.sla.SlaOptions slaOptions = sched.getSlaOptions();
+ if(slaOptions != null) {
+
+ List<Map<String, Object>> settingsObj = new ArrayList<Map<String,Object>>();
+ List<azkaban.migration.sla.SLA.SlaSetting> settings = slaOptions.getSettings();
+ for(azkaban.migration.sla.SLA.SlaSetting set : settings) {
+ Map<String, Object> setObj = new HashMap<String, Object>();
+ String setId = set.getId();
+ azkaban.migration.sla.SLA.SlaRule rule = set.getRule();
+ Map<String, Object> info = new HashMap<String, Object>();
+ info.put(INFO_DURATION, azkaban.migration.scheduler.Schedule.createPeriodString(set.getDuration()));
+ info.put(INFO_EMAIL_LIST, slaOptions.getSlaEmails());
+ List<String> actionsList = new ArrayList<String>();
+ for(azkaban.migration.sla.SLA.SlaAction act : set.getActions()) {
+ if(act.equals(azkaban.migration.sla.SLA.SlaAction.EMAIL)) {
+ actionsList.add(ACTION_ALERT);
+ info.put(ALERT_TYPE, "email");
+ } else if(act.equals(azkaban.migration.sla.SLA.SlaAction.KILL)) {
+ actionsList.add(ACTION_CANCEL_FLOW);
+ }
+ }
+ setObj.put("actions", actionsList);
+ if(setId.equals("")) {
+ info.put(INFO_FLOW_NAME, sched.getFlowName());
+ if(rule.equals(azkaban.migration.sla.SLA.SlaRule.FINISH)) {
+ setObj.put("type", TYPE_FLOW_FINISH);
+ } else if(rule.equals(azkaban.migration.sla.SLA.SlaRule.SUCCESS)) {
+ setObj.put("type", TYPE_FLOW_SUCCEED);
+ }
+ } else {
+ info.put(INFO_JOB_NAME, setId);
+ if(rule.equals(azkaban.migration.sla.SLA.SlaRule.FINISH)) {
+ setObj.put("type", TYPE_JOB_FINISH);
+ } else if(rule.equals(azkaban.migration.sla.SLA.SlaRule.SUCCESS)) {
+ setObj.put("type", TYPE_JOB_SUCCEED);
+ }
+ }
+ setObj.put("info", info);
+ settingsObj.add(setObj);
+ }
+
+ props.put("slaOptionsObj", JSONUtils.toJSON(settingsObj));
+ }
+ props.storeLocal(outputFile);
+ }
+
+ private static void file2ScheduleTrigger() throws Exception {
+
+ TriggerLoader triggerLoader = new JdbcTriggerLoader(props);
+ for(File scheduleFile : outputDir.listFiles()) {
+ logger.info("Trying to load schedule from " + scheduleFile.getAbsolutePath());
+ if(scheduleFile.isFile()) {
+ Props schedProps = new Props(null, scheduleFile);
+ String flowName = schedProps.getString("flowName");
+ String projectName = schedProps.getString("projectName");
+ int projectId = schedProps.getInt("projectId");
+ long firstSchedTimeLong = schedProps.getLong("firstScheduleTimeLong");
+// DateTime firstSchedTime = new DateTime(firstSchedTimeLong);
+ String timezoneId = schedProps.getString("timezone");
+ DateTimeZone timezone = DateTimeZone.forID(timezoneId);
+ ReadablePeriod period = Utils.parsePeriodString(schedProps.getString("period"));
+// DateTime lastModifyTime = DateTime.now();
+ long nextExecTimeLong = schedProps.getLong("nextExecTimeLong");
+// DateTime nextExecTime = new DateTime(nextExecTimeLong);
+ long submitTimeLong = schedProps.getLong("submitTimeLong");
+// DateTime submitTime = new DateTime(submitTimeLong);
+ String submitUser = schedProps.getString("submitUser");
+ ExecutionOptions executionOptions = null;
+ if(schedProps.containsKey("executionOptionsObj")) {
+ String executionOptionsObj = schedProps.getString("executionOptionsObj");
+ executionOptions = ExecutionOptions.createFromObject(JSONUtils.parseJSONFromString(executionOptionsObj));
+ } else {
+ executionOptions = new ExecutionOptions();
+ }
+ List<azkaban.sla.SlaOption> slaOptions = null;
+ if(schedProps.containsKey("slaOptionsObj")) {
+ slaOptions = new ArrayList<azkaban.sla.SlaOption>();
+ List<Map<String, Object>> settingsObj = (List<Map<String, Object>>) JSONUtils.parseJSONFromString(schedProps.getString("slaOptionsObj"));
+ for(Map<String, Object> sla : settingsObj) {
+ String type = (String) sla.get("type");
+ Map<String, Object> info = (Map<String, Object>) sla.get("info");
+ List<String> actions = (List<String>) sla.get("actions");
+ azkaban.sla.SlaOption slaOption = new azkaban.sla.SlaOption(type, actions, info);
+ slaOptions.add(slaOption);
+ }
+ }
+
+ azkaban.scheduler.Schedule schedule = new azkaban.scheduler.Schedule(-1, projectId, projectName, flowName, "ready", firstSchedTimeLong, timezone, period, DateTime.now().getMillis(), nextExecTimeLong, submitTimeLong, submitUser, executionOptions, slaOptions);
+ Trigger t = scheduleToTrigger(schedule);
+ logger.info("Ready to insert trigger " + t.getDescription());
+ triggerLoader.addTrigger(t);
+
+ }
+
+ }
+ }
+
+
+ private static Trigger scheduleToTrigger(azkaban.scheduler.Schedule s) {
+
+ Condition triggerCondition = createTimeTriggerCondition(s);
+ Condition expireCondition = createTimeExpireCondition(s);
+ List<TriggerAction> actions = createActions(s);
+ Trigger t = new Trigger(s.getScheduleId(), s.getLastModifyTime(), s.getSubmitTime(), s.getSubmitUser(), azkaban.scheduler.ScheduleManager.triggerSource, triggerCondition, expireCondition, actions);
+ if(s.isRecurring()) {
+ t.setResetOnTrigger(true);
+ }
+ return t;
+ }
+
+ private static List<TriggerAction> createActions (azkaban.scheduler.Schedule s) {
+ List<TriggerAction> actions = new ArrayList<TriggerAction>();
+ ExecuteFlowAction executeAct = new ExecuteFlowAction("executeFlowAction", s.getProjectId(), s.getProjectName(), s.getFlowName(), s.getSubmitUser(), s.getExecutionOptions(), s.getSlaOptions());
+ actions.add(executeAct);
+
+ return actions;
+ }
+
+ private static Condition createTimeTriggerCondition (azkaban.scheduler.Schedule s) {
+ Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+ ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_1", s.getFirstSchedTime(), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
+ checkers.put(checker.getId(), checker);
+ String expr = checker.getId() + ".eval()";
+ Condition cond = new Condition(checkers, expr);
+ return cond;
+ }
+
+ // if failed to trigger, auto expire?
+ private static Condition createTimeExpireCondition (azkaban.scheduler.Schedule s) {
+ Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+ ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_2", s.getFirstSchedTime(), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
+ checkers.put(checker.getId(), checker);
+ String expr = checker.getId() + ".eval()";
+ Condition cond = new Condition(checkers, expr);
+ return cond;
+ }
+
+}
diff --git a/src/java/azkaban/migration/scheduler/JdbcScheduleLoader.java b/src/java/azkaban/migration/scheduler/JdbcScheduleLoader.java
new file mode 100644
index 0000000..bcef168
--- /dev/null
+++ b/src/java/azkaban/migration/scheduler/JdbcScheduleLoader.java
@@ -0,0 +1,364 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.migration.scheduler;
+
+
+import azkaban.database.DataSourceUtils;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import javax.sql.DataSource;
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTimeZone;
+import org.joda.time.ReadablePeriod;
+
+@Deprecated
+public class JdbcScheduleLoader implements ScheduleLoader {
+
+ private static Logger logger = Logger.getLogger(JdbcScheduleLoader.class);
+
+ public static enum EncodingType {
+ PLAIN(1), GZIP(2);
+
+ private int numVal;
+
+ EncodingType(int numVal) {
+ this.numVal = numVal;
+ }
+
+ public int getNumVal() {
+ return numVal;
+ }
+
+ public static EncodingType fromInteger(int x) {
+ switch (x) {
+ case 1:
+ return PLAIN;
+ case 2:
+ return GZIP;
+ default:
+ return PLAIN;
+ }
+ }
+ }
+
+ private DataSource dataSource;
+ private EncodingType defaultEncodingType = EncodingType.GZIP;
+
+ private static final String scheduleTableName = "schedules";
+
+ private static String SELECT_ALL_SCHEDULES =
+ "SELECT project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user, enc_type, schedule_options FROM " + scheduleTableName;
+
+ private static String INSERT_SCHEDULE =
+ "INSERT INTO " + scheduleTableName + " ( project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user, enc_type, schedule_options) values (?,?,?,?,?,?,?,?,?,?,?,?,?)";
+
+ private static String REMOVE_SCHEDULE_BY_KEY =
+ "DELETE FROM " + scheduleTableName + " WHERE project_id=? AND flow_name=?";
+
+ private static String UPDATE_SCHEDULE_BY_KEY =
+ "UPDATE " + scheduleTableName + " SET status=?, first_sched_time=?, timezone=?, period=?, last_modify_time=?, next_exec_time=?, submit_time=?, submit_user=?, enc_type=?, schedule_options=? WHERE project_id=? AND flow_name=?";
+
+ private static String UPDATE_NEXT_EXEC_TIME =
+ "UPDATE " + scheduleTableName + " SET next_exec_time=? WHERE project_id=? AND flow_name=?";
+
+ private Connection getConnection() throws ScheduleManagerException {
+ Connection connection = null;
+ try {
+ connection = dataSource.getConnection();
+ } catch (Exception e) {
+ DbUtils.closeQuietly(connection);
+ throw new ScheduleManagerException("Error getting DB connection.", e);
+ }
+
+ return connection;
+ }
+
+ public EncodingType getDefaultEncodingType() {
+ return defaultEncodingType;
+ }
+
+ public void setDefaultEncodingType(EncodingType defaultEncodingType) {
+ this.defaultEncodingType = defaultEncodingType;
+ }
+
+ public JdbcScheduleLoader(Props props) {
+ String databaseType = props.getString("database.type");
+
+ if (databaseType.equals("mysql")) {
+ int port = props.getInt("mysql.port");
+ String host = props.getString("mysql.host");
+ String database = props.getString("mysql.database");
+ String user = props.getString("mysql.user");
+ String password = props.getString("mysql.password");
+ int numConnections = props.getInt("mysql.numconnections");
+
+ dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
+ }
+ }
+
+ @Override
+ public List<Schedule> loadSchedules() throws ScheduleManagerException {
+ logger.info("Loading all schedules from db.");
+ Connection connection = getConnection();
+
+ QueryRunner runner = new QueryRunner();
+ ResultSetHandler<List<Schedule>> handler = new ScheduleResultHandler();
+
+ List<Schedule> schedules;
+
+ try {
+ schedules = runner.query(connection, SELECT_ALL_SCHEDULES, handler);
+ } catch (SQLException e) {
+ logger.error(SELECT_ALL_SCHEDULES + " failed.");
+
+ DbUtils.closeQuietly(connection);
+ throw new ScheduleManagerException("Loading schedules from db failed. ", e);
+ } finally {
+ DbUtils.closeQuietly(connection);
+ }
+
+ logger.info("Now trying to update the schedules");
+
+ // filter the schedules
+ Iterator<Schedule> scheduleIterator = schedules.iterator();
+ while (scheduleIterator.hasNext()) {
+ Schedule sched = scheduleIterator.next();
+ if(!sched.updateTime()) {
+ logger.info("Schedule " + sched.getScheduleName() + " was scheduled before azkaban start, skipping it.");
+ scheduleIterator.remove();
+ removeSchedule(sched);
+ }
+ else {
+ logger.info("Recurring schedule, need to update next exec time");
+ try {
+ updateNextExecTime(sched);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new ScheduleManagerException("Update next execution time failed.", e);
+ }
+ logger.info("Schedule " + sched.getScheduleName() + " loaded and updated.");
+ }
+ }
+
+
+
+ logger.info("Loaded " + schedules.size() + " schedules.");
+
+ return schedules;
+ }
+
+ @Override
+ public void removeSchedule(Schedule s) throws ScheduleManagerException {
+ logger.info("Removing schedule " + s.getScheduleName() + " from db.");
+
+ QueryRunner runner = new QueryRunner(dataSource);
+
+ try {
+ int removes = runner.update(REMOVE_SCHEDULE_BY_KEY, s.getProjectId(), s.getFlowName());
+ if (removes == 0) {
+ throw new ScheduleManagerException("No schedule has been removed.");
+ }
+ } catch (SQLException e) {
+ logger.error(REMOVE_SCHEDULE_BY_KEY + " failed.");
+ throw new ScheduleManagerException("Remove schedule " + s.getScheduleName() + " from db failed. ", e);
+ }
+ }
+
+
+ public void insertSchedule(Schedule s) throws ScheduleManagerException {
+ logger.info("Inserting schedule " + s.getScheduleName() + " into db.");
+ insertSchedule(s, defaultEncodingType);
+ }
+
+ public void insertSchedule(Schedule s, EncodingType encType) throws ScheduleManagerException {
+
+ String json = JSONUtils.toJSON(s.optionsToObject());
+ byte[] data = null;
+ try {
+ byte[] stringData = json.getBytes("UTF-8");
+ data = stringData;
+
+ if (encType == EncodingType.GZIP) {
+ data = GZIPUtils.gzipBytes(stringData);
+ }
+ logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+ }
+ catch (IOException e) {
+ throw new ScheduleManagerException("Error encoding the schedule options. " + s.getScheduleName());
+ }
+
+ QueryRunner runner = new QueryRunner(dataSource);
+ try {
+ int inserts = runner.update(
+ INSERT_SCHEDULE,
+ s.getProjectId(),
+ s.getProjectName(),
+ s.getFlowName(),
+ s.getStatus(),
+ s.getFirstSchedTime(),
+ s.getTimezone().getID(),
+ Schedule.createPeriodString(s.getPeriod()),
+ s.getLastModifyTime(),
+ s.getNextExecTime(),
+ s.getSubmitTime(),
+ s.getSubmitUser(),
+ encType.getNumVal(),
+ data);
+ if (inserts == 0) {
+ throw new ScheduleManagerException("No schedule has been inserted.");
+ }
+ } catch (SQLException e) {
+ logger.error(INSERT_SCHEDULE + " failed.");
+ throw new ScheduleManagerException("Insert schedule " + s.getScheduleName() + " into db failed. ", e);
+ }
+ }
+
+ @Override
+ public void updateNextExecTime(Schedule s) throws ScheduleManagerException
+ {
+ logger.info("Update schedule " + s.getScheduleName() + " into db. ");
+ Connection connection = getConnection();
+ QueryRunner runner = new QueryRunner();
+ try {
+
+ runner.update(connection, UPDATE_NEXT_EXEC_TIME, s.getNextExecTime(), s.getProjectId(), s.getFlowName());
+ } catch (SQLException e) {
+ e.printStackTrace();
+ logger.error(UPDATE_NEXT_EXEC_TIME + " failed.", e);
+ throw new ScheduleManagerException("Update schedule " + s.getScheduleName() + " into db failed. ", e);
+ } finally {
+ DbUtils.closeQuietly(connection);
+ }
+ }
+
+ @Override
+ public void updateSchedule(Schedule s) throws ScheduleManagerException {
+ logger.info("Updating schedule " + s.getScheduleName() + " into db.");
+ updateSchedule(s, defaultEncodingType);
+ }
+
+ public void updateSchedule(Schedule s, EncodingType encType) throws ScheduleManagerException {
+
+ String json = JSONUtils.toJSON(s.optionsToObject());
+ byte[] data = null;
+ try {
+ byte[] stringData = json.getBytes("UTF-8");
+ data = stringData;
+
+ if (encType == EncodingType.GZIP) {
+ data = GZIPUtils.gzipBytes(stringData);
+ }
+ logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+ }
+ catch (IOException e) {
+ throw new ScheduleManagerException("Error encoding the schedule options " + s.getScheduleName());
+ }
+
+ QueryRunner runner = new QueryRunner(dataSource);
+
+ try {
+ int updates = runner.update(
+ UPDATE_SCHEDULE_BY_KEY,
+ s.getStatus(),
+ s.getFirstSchedTime(),
+ s.getTimezone().getID(),
+ Schedule.createPeriodString(s.getPeriod()),
+ s.getLastModifyTime(),
+ s.getNextExecTime(),
+ s.getSubmitTime(),
+ s.getSubmitUser(),
+ encType.getNumVal(),
+ data,
+ s.getProjectId(),
+ s.getFlowName());
+ if (updates == 0) {
+ throw new ScheduleManagerException("No schedule has been updated.");
+ }
+ } catch (SQLException e) {
+ logger.error(UPDATE_SCHEDULE_BY_KEY + " failed.");
+ throw new ScheduleManagerException("Update schedule " + s.getScheduleName() + " into db failed. ", e);
+ }
+ }
+
+ public class ScheduleResultHandler implements ResultSetHandler<List<Schedule>> {
+ @Override
+ public List<Schedule> handle(ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.<Schedule>emptyList();
+ }
+
+ ArrayList<Schedule> schedules = new ArrayList<Schedule>();
+ do {
+ int projectId = rs.getInt(1);
+ String projectName = rs.getString(2);
+ String flowName = rs.getString(3);
+ String status = rs.getString(4);
+ long firstSchedTime = rs.getLong(5);
+ DateTimeZone timezone = DateTimeZone.forID(rs.getString(6));
+ ReadablePeriod period = Schedule.parsePeriodString(rs.getString(7));
+ long lastModifyTime = rs.getLong(8);
+ long nextExecTime = rs.getLong(9);
+ long submitTime = rs.getLong(10);
+ String submitUser = rs.getString(11);
+ int encodingType = rs.getInt(12);
+ byte[] data = rs.getBytes(13);
+
+ Object optsObj = null;
+ if (data != null) {
+ EncodingType encType = EncodingType.fromInteger(encodingType);
+
+ try {
+ // Convoluted way to inflate strings. Should find common package or helper function.
+ if (encType == EncodingType.GZIP) {
+ // Decompress the sucker.
+ String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+ optsObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+ else {
+ String jsonString = new String(data, "UTF-8");
+ optsObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+ } catch (IOException e) {
+ throw new SQLException("Error reconstructing schedule options " + projectName + "." + flowName);
+ }
+ }
+
+ Schedule s = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser);
+ if (optsObj != null) {
+ s.createAndSetScheduleOptions(optsObj);
+ }
+
+ schedules.add(s);
+ } while (rs.next());
+
+ return schedules;
+ }
+
+ }
+}
\ No newline at end of file
src/java/azkaban/migration/scheduler/Schedule.java 363(+363 -0)
diff --git a/src/java/azkaban/migration/scheduler/Schedule.java b/src/java/azkaban/migration/scheduler/Schedule.java
new file mode 100644
index 0000000..9490243
--- /dev/null
+++ b/src/java/azkaban/migration/scheduler/Schedule.java
@@ -0,0 +1,363 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.migration.scheduler;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
+import org.joda.time.DurationFieldType;
+import org.joda.time.Hours;
+import org.joda.time.Minutes;
+import org.joda.time.Months;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.Seconds;
+import org.joda.time.Weeks;
+
+import azkaban.executor.ExecutionOptions;
+import azkaban.migration.sla.SlaOptions;
+import azkaban.utils.Pair;
+
+@Deprecated
+public class Schedule{
+
+// private long projectGuid;
+// private long flowGuid;
+
+// private String scheduleId;
+
+ private int projectId;
+ private String projectName;
+ private String flowName;
+ private long firstSchedTime;
+ private DateTimeZone timezone;
+ private long lastModifyTime;
+ private ReadablePeriod period;
+ private long nextExecTime;
+ private String submitUser;
+ private String status;
+ private long submitTime;
+
+ private ExecutionOptions executionOptions;
+ private SlaOptions slaOptions;
+
+ public Schedule(
+ int projectId,
+ String projectName,
+ String flowName,
+ String status,
+ long firstSchedTime,
+ DateTimeZone timezone,
+ ReadablePeriod period,
+ long lastModifyTime,
+ long nextExecTime,
+ long submitTime,
+ String submitUser
+ ) {
+ this.projectId = projectId;
+ this.projectName = projectName;
+ this.flowName = flowName;
+ this.firstSchedTime = firstSchedTime;
+ this.timezone = timezone;
+ this.lastModifyTime = lastModifyTime;
+ this.period = period;
+ this.nextExecTime = nextExecTime;
+ this.submitUser = submitUser;
+ this.status = status;
+ this.submitTime = submitTime;
+ this.executionOptions = null;
+ this.slaOptions = null;
+ }
+
+ public Schedule(
+ int projectId,
+ String projectName,
+ String flowName,
+ String status,
+ long firstSchedTime,
+ String timezoneId,
+ String period,
+ long lastModifyTime,
+ long nextExecTime,
+ long submitTime,
+ String submitUser,
+ ExecutionOptions executionOptions,
+ SlaOptions slaOptions
+ ) {
+ this.projectId = projectId;
+ this.projectName = projectName;
+ this.flowName = flowName;
+ this.firstSchedTime = firstSchedTime;
+ this.timezone = DateTimeZone.forID(timezoneId);
+ this.lastModifyTime = lastModifyTime;
+ this.period = parsePeriodString(period);
+ this.nextExecTime = nextExecTime;
+ this.submitUser = submitUser;
+ this.status = status;
+ this.submitTime = submitTime;
+ this.executionOptions = executionOptions;
+ this.slaOptions = slaOptions;
+ }
+
+ public Schedule(
+ int projectId,
+ String projectName,
+ String flowName,
+ String status,
+ long firstSchedTime,
+ DateTimeZone timezone,
+ ReadablePeriod period,
+ long lastModifyTime,
+ long nextExecTime,
+ long submitTime,
+ String submitUser,
+ ExecutionOptions executionOptions,
+ SlaOptions slaOptions
+ ) {
+ this.projectId = projectId;
+ this.projectName = projectName;
+ this.flowName = flowName;
+ this.firstSchedTime = firstSchedTime;
+ this.timezone = timezone;
+ this.lastModifyTime = lastModifyTime;
+ this.period = period;
+ this.nextExecTime = nextExecTime;
+ this.submitUser = submitUser;
+ this.status = status;
+ this.submitTime = submitTime;
+ this.executionOptions = executionOptions;
+ this.slaOptions = slaOptions;
+ }
+
+ public ExecutionOptions getExecutionOptions() {
+ return executionOptions;
+ }
+
+ public void setFlowOptions(ExecutionOptions executionOptions) {
+ this.executionOptions = executionOptions;
+ }
+
+ public SlaOptions getSlaOptions() {
+ return slaOptions;
+ }
+
+ public void setSlaOptions(SlaOptions slaOptions) {
+ this.slaOptions = slaOptions;
+ }
+
+ public String getScheduleName() {
+ return projectName + "." + flowName + " (" + projectId + ")";
+ }
+
+ public String toString() {
+ return projectName + "." + flowName + " (" + projectId + ")" + " to be run at (starting) " +
+ new DateTime(firstSchedTime).toDateTimeISO() + " with recurring period of " + (period == null ? "non-recurring" : createPeriodString(period));
+ }
+
+ public Pair<Integer, String> getScheduleId() {
+ return new Pair<Integer, String>(getProjectId(), getFlowName());
+ }
+
+ public int getProjectId() {
+ return projectId;
+ }
+
+ public String getProjectName() {
+ return projectName;
+ }
+
+ public String getFlowName() {
+ return flowName;
+ }
+
+ public long getFirstSchedTime() {
+ return firstSchedTime;
+ }
+
+ public DateTimeZone getTimezone() {
+ return timezone;
+ }
+
+ public long getLastModifyTime() {
+ return lastModifyTime;
+ }
+
+ public ReadablePeriod getPeriod() {
+ return period;
+ }
+
+ public long getNextExecTime() {
+ return nextExecTime;
+ }
+
+ public String getSubmitUser() {
+ return submitUser;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public long getSubmitTime() {
+ return submitTime;
+ }
+
+ public boolean updateTime() {
+ if (new DateTime(nextExecTime).isAfterNow()) {
+ return true;
+ }
+
+ if (period != null) {
+ DateTime nextTime = getNextRuntime(nextExecTime, timezone, period);
+
+ this.nextExecTime = nextTime.getMillis();
+ return true;
+ }
+
+ return false;
+ }
+
+ private DateTime getNextRuntime(long scheduleTime, DateTimeZone timezone, ReadablePeriod period) {
+ DateTime now = new DateTime();
+ DateTime date = new DateTime(scheduleTime).withZone(timezone);
+ int count = 0;
+ while (!now.isBefore(date)) {
+ if (count > 100000) {
+ throw new IllegalStateException(
+ "100000 increments of period did not get to present time.");
+ }
+
+ if (period == null) {
+ break;
+ } else {
+ date = date.plus(period);
+ }
+
+ count += 1;
+ }
+
+ return date;
+ }
+
+ public static ReadablePeriod parsePeriodString(String periodStr) {
+ ReadablePeriod period;
+ char periodUnit = periodStr.charAt(periodStr.length() - 1);
+ if (periodUnit == 'n') {
+ return null;
+ }
+
+ int periodInt = Integer.parseInt(periodStr.substring(0,
+ periodStr.length() - 1));
+ switch (periodUnit) {
+ case 'M':
+ period = Months.months(periodInt);
+ break;
+ case 'w':
+ period = Weeks.weeks(periodInt);
+ break;
+ case 'd':
+ period = Days.days(periodInt);
+ break;
+ case 'h':
+ period = Hours.hours(periodInt);
+ break;
+ case 'm':
+ period = Minutes.minutes(periodInt);
+ break;
+ case 's':
+ period = Seconds.seconds(periodInt);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid schedule period unit '"
+ + periodUnit);
+ }
+
+ return period;
+ }
+
+ public static String createPeriodString(ReadablePeriod period) {
+ String periodStr = "n";
+
+ if (period == null) {
+ return "n";
+ }
+
+ if (period.get(DurationFieldType.months()) > 0) {
+ int months = period.get(DurationFieldType.months());
+ periodStr = months + "M";
+ } else if (period.get(DurationFieldType.weeks()) > 0) {
+ int weeks = period.get(DurationFieldType.weeks());
+ periodStr = weeks + "w";
+ } else if (period.get(DurationFieldType.days()) > 0) {
+ int days = period.get(DurationFieldType.days());
+ periodStr = days + "d";
+ } else if (period.get(DurationFieldType.hours()) > 0) {
+ int hours = period.get(DurationFieldType.hours());
+ periodStr = hours + "h";
+ } else if (period.get(DurationFieldType.minutes()) > 0) {
+ int minutes = period.get(DurationFieldType.minutes());
+ periodStr = minutes + "m";
+ } else if (period.get(DurationFieldType.seconds()) > 0) {
+ int seconds = period.get(DurationFieldType.seconds());
+ periodStr = seconds + "s";
+ }
+
+ return periodStr;
+ }
+
+
+ public Map<String,Object> optionsToObject() {
+ if(executionOptions != null || slaOptions != null) {
+ HashMap<String, Object> schedObj = new HashMap<String, Object>();
+
+ if(executionOptions != null) {
+ schedObj.put("executionOptions", executionOptions.toObject());
+ }
+ if(slaOptions != null) {
+ schedObj.put("slaOptions", slaOptions.toObject());
+ }
+
+ return schedObj;
+ }
+ return null;
+ }
+
+ public void createAndSetScheduleOptions(Object obj) {
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> schedObj = (HashMap<String, Object>)obj;
+ if (schedObj.containsKey("executionOptions")) {
+ ExecutionOptions execOptions = ExecutionOptions.createFromObject(schedObj.get("executionOptions"));
+ this.executionOptions = execOptions;
+ }
+ else if (schedObj.containsKey("flowOptions")){
+ ExecutionOptions execOptions = ExecutionOptions.createFromObject(schedObj.get("flowOptions"));
+ this.executionOptions = execOptions;
+ execOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
+ }
+ else {
+ this.executionOptions = new ExecutionOptions();
+ this.executionOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
+ }
+
+ if (schedObj.containsKey("slaOptions")) {
+ SlaOptions slaOptions = SlaOptions.fromObject(schedObj.get("slaOptions"));
+ this.slaOptions = slaOptions;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/java/azkaban/migration/scheduler/ScheduleLoader.java b/src/java/azkaban/migration/scheduler/ScheduleLoader.java
new file mode 100644
index 0000000..6511d9c
--- /dev/null
+++ b/src/java/azkaban/migration/scheduler/ScheduleLoader.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.migration.scheduler;
+
+import java.util.List;
+
+@Deprecated
+public interface ScheduleLoader {
+
+ public void insertSchedule(Schedule s) throws ScheduleManagerException;
+
+ public void updateSchedule(Schedule s) throws ScheduleManagerException;
+
+ public List<Schedule> loadSchedules() throws ScheduleManagerException;
+
+ public void removeSchedule(Schedule s) throws ScheduleManagerException;
+
+ public void updateNextExecTime(Schedule s) throws ScheduleManagerException;
+
+}
\ No newline at end of file
diff --git a/src/java/azkaban/migration/scheduler/ScheduleManagerException.java b/src/java/azkaban/migration/scheduler/ScheduleManagerException.java
new file mode 100644
index 0000000..f0f6705
--- /dev/null
+++ b/src/java/azkaban/migration/scheduler/ScheduleManagerException.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.migration.scheduler;
+
+@Deprecated
+public class ScheduleManagerException extends Exception{
+ private static final long serialVersionUID = 1L;
+
+ public ScheduleManagerException(String message) {
+ super(message);
+ }
+
+ public ScheduleManagerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
\ No newline at end of file
src/java/azkaban/migration/sla/SLA.java 252(+252 -0)
diff --git a/src/java/azkaban/migration/sla/SLA.java b/src/java/azkaban/migration/sla/SLA.java
new file mode 100644
index 0000000..0e75965
--- /dev/null
+++ b/src/java/azkaban/migration/sla/SLA.java
@@ -0,0 +1,252 @@
+package azkaban.migration.sla;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.joda.time.ReadablePeriod;
+
+import azkaban.migration.scheduler.Schedule;
+
+@Deprecated
+public class SLA {
+
+ public static enum SlaRule {
+ SUCCESS(1), FINISH(2), WAITANDCHECKJOB(3);
+
+ private int numVal;
+
+ SlaRule(int numVal) {
+ this.numVal = numVal;
+ }
+
+ public int getNumVal() {
+ return numVal;
+ }
+
+ public static SlaRule fromInteger(int x) {
+ switch (x) {
+ case 1:
+ return SUCCESS;
+ case 2:
+ return FINISH;
+ case 3:
+ return WAITANDCHECKJOB;
+ default:
+ return SUCCESS;
+ }
+ }
+ }
+
+ public static enum SlaAction {
+ EMAIL(1), KILL(2);
+
+ private int numVal;
+
+ SlaAction(int numVal) {
+ this.numVal = numVal;
+ }
+
+ public int getNumVal() {
+ return numVal;
+ }
+
+ public static SlaAction fromInteger(int x) {
+ switch (x) {
+ case 1:
+ return EMAIL;
+ case 2:
+ return KILL;
+ default:
+ return EMAIL;
+ }
+ }
+ }
+
+ public static class SlaSetting {
+ public String getId() {
+ return id;
+ }
+ public void setId(String id) {
+ this.id = id;
+ }
+ public ReadablePeriod getDuration() {
+ return duration;
+ }
+ public void setDuration(ReadablePeriod duration) {
+ this.duration = duration;
+ }
+ public SlaRule getRule() {
+ return rule;
+ }
+ public void setRule(SlaRule rule) {
+ this.rule = rule;
+ }
+ public List<SlaAction> getActions() {
+ return actions;
+ }
+ public void setActions(List<SlaAction> actions) {
+ this.actions = actions;
+ }
+
+ public Object toObject() {
+ Map<String, Object> obj = new HashMap<String, Object>();
+ obj.put("id", id);
+ obj.put("duration", Schedule.createPeriodString(duration));
+// List<String> rulesObj = new ArrayList<String>();
+// for(SlaRule rule : rules) {
+// rulesObj.add(rule.toString());
+// }
+// obj.put("rules", rulesObj);
+ obj.put("rule", rule.toString());
+ List<String> actionsObj = new ArrayList<String>();
+ for(SlaAction act : actions) {
+ actionsObj.add(act.toString());
+ }
+ obj.put("actions", actionsObj);
+ return obj;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static SlaSetting fromObject(Object obj) {
+ Map<String, Object> slaObj = (HashMap<String, Object>) obj;
+ String subId = (String) slaObj.get("id");
+ ReadablePeriod dur = Schedule.parsePeriodString((String) slaObj.get("duration"));
+// List<String> rulesObj = (ArrayList<String>) slaObj.get("rules");
+// List<SlaRule> slaRules = new ArrayList<SLA.SlaRule>();
+// for(String rule : rulesObj) {
+// slaRules.add(SlaRule.valueOf(rule));
+// }
+ SlaRule slaRule = SlaRule.valueOf((String) slaObj.get("rule"));
+ List<String> actsObj = (ArrayList<String>) slaObj.get("actions");
+ List<SlaAction> slaActs = new ArrayList<SlaAction>();
+ for(String act : actsObj) {
+ slaActs.add(SlaAction.valueOf(act));
+ }
+
+ SlaSetting ret = new SlaSetting();
+ ret.setId(subId);
+ ret.setDuration(dur);
+ ret.setRule(slaRule);
+ ret.setActions(slaActs);
+ return ret;
+ }
+
+ private String id;
+ private ReadablePeriod duration;
+ private SlaRule rule = SlaRule.SUCCESS;
+ private List<SlaAction> actions;
+ }
+
+ private int execId;
+ private String jobName;
+ private DateTime checkTime;
+ private List<String> emails;
+ private List<SlaAction> actions;
+ private List<SlaSetting> jobSettings;
+ private SlaRule rule;
+
+ public SLA(
+ int execId,
+ String jobName,
+ DateTime checkTime,
+ List<String> emails,
+ List<SlaAction> slaActions,
+ List<SlaSetting> jobSettings,
+ SlaRule slaRule
+ ) {
+ this.execId = execId;
+ this.jobName = jobName;
+ this.checkTime = checkTime;
+ this.emails = emails;
+ this.actions = slaActions;
+ this.jobSettings = jobSettings;
+ this.rule = slaRule;
+ }
+
+ public int getExecId() {
+ return execId;
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ public DateTime getCheckTime() {
+ return checkTime;
+ }
+
+ public List<String> getEmails() {
+ return emails;
+ }
+
+ public List<SlaAction> getActions() {
+ return actions;
+ }
+
+ public List<SlaSetting> getJobSettings() {
+ return jobSettings;
+ }
+
+ public SlaRule getRule() {
+ return rule;
+ }
+
+ public String toString() {
+ return execId + " " + jobName + " to be checked at " + checkTime.toDateTimeISO();
+ }
+
+ public Map<String,Object> optionToObject() {
+ HashMap<String, Object> slaObj = new HashMap<String, Object>();
+
+ slaObj.put("emails", emails);
+// slaObj.put("rule", rule.toString());
+
+ List<String> actionsObj = new ArrayList<String>();
+ for(SlaAction act : actions) {
+ actionsObj.add(act.toString());
+ }
+ slaObj.put("actions", actionsObj);
+
+ if(jobSettings != null && jobSettings.size() > 0) {
+ List<Object> settingsObj = new ArrayList<Object>();
+ for(SlaSetting set : jobSettings) {
+ settingsObj.add(set.toObject());
+ }
+ slaObj.put("jobSettings", settingsObj);
+ }
+
+ return slaObj;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static SLA createSlaFromObject(int execId, String jobName, DateTime checkTime, SlaRule rule, Object obj) {
+
+ HashMap<String, Object> slaObj = (HashMap<String,Object>)obj;
+
+ List<String> emails = (List<String>)slaObj.get("emails");
+// SlaRule rule = SlaRule.valueOf((String)slaObj.get("rule"));
+ List<String> actsObj = (ArrayList<String>) slaObj.get("actions");
+ List<SlaAction> slaActs = new ArrayList<SlaAction>();
+ for(String act : actsObj) {
+ slaActs.add(SlaAction.valueOf(act));
+ }
+ List<SlaSetting> jobSets = null;
+ if(slaObj.containsKey("jobSettings") && slaObj.get("jobSettings") != null) {
+ jobSets = new ArrayList<SLA.SlaSetting>();
+ for(Object set : (List<Object>)slaObj.get("jobSettings")) {
+ SlaSetting jobSet = SlaSetting.fromObject(set);
+ jobSets.add(jobSet);
+ }
+ }
+
+ return new SLA(execId, jobName, checkTime, emails, slaActs, jobSets, rule);
+ }
+
+ public void setCheckTime(DateTime time) {
+ this.checkTime = time;
+ }
+
+}
diff --git a/src/java/azkaban/migration/sla/SlaOptions.java b/src/java/azkaban/migration/sla/SlaOptions.java
new file mode 100644
index 0000000..f7b9d49
--- /dev/null
+++ b/src/java/azkaban/migration/sla/SlaOptions.java
@@ -0,0 +1,52 @@
+package azkaban.migration.sla;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import azkaban.migration.sla.SLA.SlaSetting;
+
+@Deprecated
+public class SlaOptions {
+
+ public List<String> getSlaEmails() {
+ return slaEmails;
+ }
+ public void setSlaEmails(List<String> slaEmails) {
+ this.slaEmails = slaEmails;
+ }
+ public List<SlaSetting> getSettings() {
+ return settings;
+ }
+ public void setSettings(List<SlaSetting> settings) {
+ this.settings = settings;
+ }
+ private List<String> slaEmails;
+ private List<SlaSetting> settings;
+ public Object toObject() {
+ Map<String, Object> obj = new HashMap<String, Object>();
+ obj.put("slaEmails", slaEmails);
+ List<Object> slaSettings = new ArrayList<Object>();
+ for(SlaSetting s : settings) {
+ slaSettings.add(s.toObject());
+ }
+ obj.put("settings", slaSettings);
+ return obj;
+ }
+ @SuppressWarnings("unchecked")
+ public static SlaOptions fromObject(Object object) {
+ if(object != null) {
+ SlaOptions slaOptions = new SlaOptions();
+ Map<String, Object> obj = (HashMap<String, Object>) object;
+ slaOptions.setSlaEmails((List<String>) obj.get("slaEmails"));
+ List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
+ for(Object set: (List<Object>)obj.get("settings")) {
+ slaSets.add(SlaSetting.fromObject(set));
+ }
+ slaOptions.setSettings(slaSets);
+ return slaOptions;
+ }
+ return null;
+ }
+}
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 51c0dab..a0adf57 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -45,7 +45,6 @@ import azkaban.user.Permission;
import azkaban.user.User;
import azkaban.user.Permission.Type;
import azkaban.utils.FileIOUtils.LogData;
-import azkaban.utils.LogSummary;
import azkaban.utils.JSONUtils;
import azkaban.webapp.AzkabanWebServer;
import azkaban.webapp.session.Session;
@@ -133,12 +132,15 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
else if (ajaxName.equals("fetchExecJobLogs")) {
ajaxFetchJobLogs(req, resp, ret, session.getUser(), exFlow);
}
+<<<<<<< HEAD
else if (ajaxName.equals("fetchExecJobSummary")) {
ajaxFetchJobSummary(req, resp, ret, session.getUser(), exFlow);
}
else if (ajaxName.equals("fetchExecJobStats")) {
ajaxFetchJobStats(req, resp, ret, session.getUser(), exFlow);
}
+=======
+>>>>>>> master
else if (ajaxName.equals("retryFailedJobs")) {
ajaxRestartFailed(req, resp, ret, session.getUser(), exFlow);
}
@@ -459,53 +461,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
}
- /**
- * Gets the job summary.
- *
- * @param req
- * @param resp
- * @param user
- * @param exFlow
- * @throws ServletException
- */
- private void ajaxFetchJobSummary(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
- Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
- if (project == null) {
- return;
- }
-
- String jobId = this.getParam(req, "jobId");
- resp.setCharacterEncoding("utf-8");
-
- try {
- ExecutableNode node = exFlow.getExecutableNode(jobId);
- if (node == null) {
- ret.put("error", "Job " + jobId + " doesn't exist in " + exFlow.getExecutionId());
- return;
- }
-
- int attempt = this.getIntParam(req, "attempt", node.getAttempt());
- LogData data = executorManager.getExecutionJobLog(exFlow, jobId, 0, Integer.MAX_VALUE, attempt);
-
- LogSummary summary = new LogSummary(data);
- ret.put("commandProperties", summary.getCommandProperties());
-
- String jobType = summary.getJobType();
-
- if (jobType.contains("pig")) {
- ret.put("summaryTableHeaders", summary.getPigSummaryTableHeaders());
- ret.put("summaryTableData", summary.getPigSummaryTableData());
- ret.put("statTableHeaders", summary.getPigStatTableHeaders());
- ret.put("statTableData", summary.getPigStatTableData());
- } else if (jobType.contains("hive")) {
- ret.put("hiveQueries", summary.getHiveQueries());
- ret.put("hiveQueryJobs", summary.getHiveQueryJobs());
- }
- } catch (ExecutorManagerException e) {
- throw new ServletException(e);
- }
- }
-
private void ajaxFetchJobStats(
HttpServletRequest req,
HttpServletResponse resp,
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm b/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
index 9b1cd51..4f8aa93 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
@@ -22,6 +22,7 @@
#parse("azkaban/webapp/servlet/velocity/javascript.vm")
<script type="text/javascript" src="${context}/js/azkaban.ajax.utils.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.logdata.model.js"></script>
<script type="text/javascript" src="${context}/js/azkaban.jobdetails.view.js"></script>
<script type="text/javascript">
var contextURL = "${context}";
@@ -110,44 +111,56 @@
## Job Summary
<div class="container-full" id="jobSummaryView">
- <div class="row">
- <div class="col-xs-12">
- <table id="commandTable" class="table table-striped table-bordered table-hover">
- </table>
-
- <div class="panel panel-default" id="jobsummary">
- <div class="panel-heading">Job Summary</div>
- <table class="table table-striped table-bordered table-hover">
- <thead id="summaryHeader">
- </thead>
- <tbody id="summaryBody">
- </tbody>
- </table>
- </div>
-
- <div class="panel panel-default" id="jobstats">
- <div class="panel-heading">Job Stats</div>
- <div class="panel-body panel-body-stats">
- <table class="table table-striped table-bordered table-hover table-condensed">
- <thead id="statsHeader">
+ <div class="row">
+ <div class="col-lg-12">
+ <div class="panel panel-default">
+ <div class="panel-heading">
+ <div class="pull-left">
+ <button type="button" id="updateSummaryBtn" class="btn btn-xs btn-default">Refresh</button>
+ </div>
+
+ </div>
+
+ <div class="panel panel-default" id="commandSummary">
+ <div class="panel-heading">Command Summary</div>
+ <table id="commandTable" class="table table-striped table-bordered table-hover">
+ </table>
+ </div>
+
+ <div class="panel panel-default" id="pigJobSummary">
+ <div class="panel-heading">Pig Job Summary</div>
+ <table class="table table-striped table-bordered table-hover">
+ <thead id="summaryHeader">
</thead>
- <tbody id="statsBody">
+ <tbody id="summaryBody">
</tbody>
</table>
</div>
- </div>
-
- <div class="panel panel-default" id="hiveTable">
- <div class="panel-heading">Job Summary</div>
- <table class="table table-striped table-bordered table-hover">
- <thead id="hiveTableHeader">
- </thead>
- <tbody id="hiveTableBody">
- </tbody>
- </table>
- </div>
- </div>
- </div>
+
+ <div class="panel panel-default" id="pigJobStats">
+ <div class="panel-heading">Pig Job Stats</div>
+ <div class="panel-body panel-body-stats">
+ <table class="table table-striped table-bordered table-hover table-condensed">
+ <thead id="statsHeader">
+ </thead>
+ <tbody id="statsBody">
+ </tbody>
+ </table>
+ </div>
+ </div>
+
+ <div class="panel panel-default" id="hiveJobSummary">
+ <div class="panel-heading">Hive Job Summary</div>
+ <table class="table table-striped table-bordered table-hover" id="hiveTable">
+ <thead id="hiveTableHeader">
+ </thead>
+ <tbody id="hiveTableBody">
+ </tbody>
+ </table>
+ </div>
+ </div>
+ </div>
+ </div>
</div>
## Error message message dialog.
diff --git a/src/package/webserver/bin/schedule2trigger.sh b/src/package/webserver/bin/schedule2trigger.sh
new file mode 100644
index 0000000..1178cf3
--- /dev/null
+++ b/src/package/webserver/bin/schedule2trigger.sh
@@ -0,0 +1,4 @@
+#!/bin/bash
+
+java -cp "lib/*:extlib/*" azkaban.migration.schedule2trigger.Schedule2Trigger conf/azkaban.properties
+
src/web/js/azkaban.jobdetails.view.js 359(+147 -212)
diff --git a/src/web/js/azkaban.jobdetails.view.js b/src/web/js/azkaban.jobdetails.view.js
index 3f2b7a2..f191f37 100644
--- a/src/web/js/azkaban.jobdetails.view.js
+++ b/src/web/js/azkaban.jobdetails.view.js
@@ -16,247 +16,184 @@
$.namespace('azkaban');
-var logModel;
-azkaban.LogModel = Backbone.Model.extend({});
-
var jobLogView;
azkaban.JobLogView = Backbone.View.extend({
events: {
- "click #updateLogBtn" : "handleUpdate"
- },
- initialize: function(settings) {
- this.model.set({"offset": 0});
- this.handleUpdate();
+ "click #updateLogBtn" : "refresh"
},
- handleUpdate: function(evt) {
- var requestURL = contextURL + "/executor";
- var model = this.model;
- var finished = false;
-
- var date = new Date();
- var startTime = date.getTime();
-
- while (!finished) {
- var offset = this.model.get("offset");
- var requestData = {
- "execid": execId,
- "jobId": jobId,
- "ajax":"fetchExecJobLogs",
- "offset": offset,
- "length": 50000,
- "attempt": attempt
- };
-
- var successHandler = function(data) {
- console.log("fetchLogs");
- if (data.error) {
- console.log(data.error);
- finished = true;
- }
- else if (data.length == 0) {
- finished = true;
- }
- else {
- var date = new Date();
- var endTime = date.getTime();
- if ((endTime - startTime) > 10000) {
- finished = true;
- showDialog("Alert","The log is taking a long time to finish loading. Azkaban has stopped loading them. Please click Refresh to restart the load.");
- }
- var re = /(https?:\/\/(([-\w\.]+)+(:\d+)?(\/([\w/_\.]*(\?\S+)?)?)?))/g;
- var log = $("#logSection").text();
- if (!log) {
- log = data.data;
- }
- else {
- log += data.data;
- }
-
- var newOffset = data.offset + data.length;
- $("#logSection").text(log);
- log = $("#logSection").html();
- log = log.replace(re, "<a href=\"$1\" title=\"\">$1</a>");
- $("#logSection").html(log);
+ initialize: function() {
+ this.listenTo(this.model, "change:logData", this.render);
+ },
- model.set({"offset": newOffset, "log": log});
- $(".logViewer").scrollTop(9999);
- }
- }
+ refresh: function() {
+ this.model.refresh();
+ },
- $.ajax({
- url: requestURL,
- type: "get",
- async: false,
- data: requestData,
- dataType: "json",
- error: function(data) {
- console.log(data);
- finished = true;
- },
- success: successHandler
- });
- }
+ render: function() {
+ var re = /(https?:\/\/(([-\w\.]+)+(:\d+)?(\/([\w/_\.]*(\?\S+)?)?)?))/g;
+ var log = this.model.get("logData");
+ log = log.replace(re, "<a href=\"$1\" title=\"\">$1</a>");
+ $("#logSection").html(log);
}
});
-var summaryModel;
-azkaban.SummaryModel = Backbone.Model.extend({});
-
var jobSummaryView;
azkaban.JobSummaryView = Backbone.View.extend({
events: {
- "click #updateSummaryBtn" : "handleUpdate"
+ "click #updateSummaryBtn" : "refresh"
},
+
initialize: function(settings) {
- this.handleUpdate();
+ $("#commandSummary").hide();
+ $("#pigJobSummary").hide();
+ $("#pigJobStats").hide();
+ $("#hiveJobSummary").hide();
+
+ this.listenTo(this.model, "change:commandProperties", this.renderCommandTable);
+ this.listenTo(this.model, "change:pigSummary", this.renderPigSummaryTable);
+ this.listenTo(this.model, "change:pigStats", this.renderPigStatsTable);
+ this.listenTo(this.model, "change:hiveSummary", this.renderHiveTable);
},
- handleUpdate: function(evt) {
- var requestURL = contextURL + "/executor";
- var model = this.model;
- var self = this;
- var requestData = {
- "execid": execId,
- "jobId": jobId,
- "ajax":"fetchExecJobSummary",
- "attempt": attempt
- };
+ refresh: function() {
+ this.model.refresh();
+ },
- $.ajax({
- url: requestURL,
- dataType: "json",
- data: requestData,
- error: function(data) {
- console.log(data);
- },
- success: function(data) {
- console.log("fetchSummary");
- if (data.error) {
- console.log(data.error);
- }
- else {
- self.renderCommandTable(data.commandProperties);
- self.renderJobTable(data.summaryTableHeaders, data.summaryTableData, "summary");
- self.renderJobTable(data.statTableHeaders, data.statTableData, "stats");
- self.renderHiveTable(data.hiveQueries, data.hiveQueryJobs);
- }
- }
- });
+ handleUpdate: function(evt) {
+ renderJobTable(jobSummary.summaryTableHeaders, jobSummary.summaryTableData, "summary");
+ renderJobTable(jobSummary.statTableHeaders, jobSummary.statTableData, "stats");
+ renderHiveTable(jobSummary.hiveQueries, jobSummary.hiveQueryJobs);
},
- renderCommandTable: function(commandProperties) {
- if (commandProperties) {
- var commandTable = $("#commandTable");
-
- for (var i = 0; i < commandProperties.length; i++) {
- var prop = commandProperties[i];
+ renderCommandTable: function() {
+ var commandTable = $("#commandTable");
+ var commandProperties = this.model.get("commandProperties");
+
+ for (var key in commandProperties) {
+ if (commandProperties.hasOwnProperty(key)) {
+ var value = commandProperties[key];
+ if (Array.isArray(value)) {
+ value = value.join("<br/>");
+ }
var tr = document.createElement("tr");
- var name = document.createElement("td");
- var value = document.createElement("td");
- $(name).html("<b>" + prop.first + "</b>");
- $(value).html(prop.second);
- $(tr).append(name);
- $(tr).append(value);
+ var keyTd = document.createElement("td");
+ var valueTd = document.createElement("td");
+ $(keyTd).html("<b>" + key + "</b>");
+ $(valueTd).html(value);
+ $(tr).append(keyTd);
+ $(tr).append(valueTd);
commandTable.append(tr);
}
}
+
+ $("#commandSummary").show();
},
- renderJobTable: function(headers, data, prefix) {
- if (headers) {
- // Add table headers
- var header = $("#" + prefix + "Header");
- var tr = document.createElement("tr");
- var i;
- for (i = 0; i < headers.length; i++) {
- var th = document.createElement("th");
- $(th).text(headers[i]);
- $(tr).append(th);
- }
- header.append(tr);
-
- // Add table body
- var body = $("#" + prefix + "Body");
- for (i = 0; i < data.length; i++) {
- tr = document.createElement("tr");
- var row = data[i];
- for (var j = 0; j < row.length; j++) {
- var td = document.createElement("td");
- if (j == 0) {
- // first column is a link to job details page
- $(td).html(row[j]);
- } else {
- $(td).text(row[j]);
- }
- $(tr).append(td);
+ renderPigTable: function(tableName, data) {
+ // Add table headers
+ var header = $("#" + tableName + "Header");
+ var tr = document.createElement("tr");
+ var i;
+ var headers = data[0];
+ var numColumns = headers.length;
+ for (i = 0; i < numColumns; i++) {
+ var th = document.createElement("th");
+ $(th).text(headers[i]);
+ $(tr).append(th);
+ }
+ header.append(tr);
+
+ // Add table body
+ var body = $("#" + tableName + "Body");
+ for (i = 1; i < data.length; i++) {
+ tr = document.createElement("tr");
+ var row = data[i];
+ for (var j = 0; j < numColumns; j++) {
+ var td = document.createElement("td");
+ if (j == 0) {
+ // first column is a link to job details page
+ $(td).html(row[j]);
+ } else {
+ $(td).text(row[j]);
}
- body.append(tr);
+ $(tr).append(td);
}
- } else {
- $("#job" + prefix).hide();
+ body.append(tr);
}
+
+ $("#pigJob" + tableName.charAt(0).toUpperCase() + tableName.substring(1)).show();
},
- renderHiveTable: function(queries, queryJobs) {
- if (queries) {
- // Set up table column headers
- var header = $("#hiveTableHeader");
- var tr = document.createElement("tr");
- var headers = ["Query","Job","Map","Reduce","HDFS Read","HDFS Write"];
- var i;
-
- for (i = 0; i < headers.length; i++) {
- var th = document.createElement("th");
- $(th).text(headers[i]);
- $(tr).append(th);
- }
- header.append(tr);
+ renderPigSummaryTable: function() {
+ this.renderPigTable("summary", this.model.get("pigSummary"));
+ },
+ renderPigStatsTable: function() {
+ this.renderPigTable("stats", this.model.get("pigStats"));
+ },
+ renderHiveTable: function() {
+ var hiveSummary = this.model.get("hiveSummary");
+ var queries = hiveSummary.hiveQueries;
+ var queryJobs = hiveSummary.hiveQueryJobs;
+
+ // Set up table column headers
+ var header = $("#hiveTableHeader");
+ var tr = document.createElement("tr");
+ var headers = ["Query","Job","Map","Reduce","HDFS Read","HDFS Write"];
+ var i;
+
+ for (i = 0; i < headers.length; i++) {
+ var th = document.createElement("th");
+ $(th).text(headers[i]);
+ $(tr).append(th);
+ }
+ header.html(tr);
+
+ // Construct table body
+ var oldBody = $("#hiveTableBody");
+ var newBody = $(document.createElement("tbody")).attr("id", "hiveTableBody");
+ for (i = 0; i < queries.length; i++) {
+ // new query
+ tr = document.createElement("tr");
+ var td = document.createElement("td");
+ $(td).html("<b>" + queries[i] + "</b>");
+ $(tr).append(td);
- // Construct table body
- var body = $("#hiveTableBody");
- for (i = 0; i < queries.length; i++) {
- // new query
- tr = document.createElement("tr");
- var td = document.createElement("td");
- $(td).html("<b>" + queries[i] + "</b>");
- $(tr).append(td);
+ var jobs = queryJobs[i];
+ if (jobs != null) {
+ // add first job for this query
+ var jobValues = jobs[0];
+ var j;
+ for (j = 0; j < jobValues.length; j++) {
+ td = document.createElement("td");
+ $(td).html(jobValues[j]);
+ $(tr).append(td);
+ }
+ newBody.append(tr);
- var jobs = queryJobs[i];
- if (jobs != null) {
- // add first job for this query
- var jobValues = jobs[0];
- var j;
- for (j = 0; j < jobValues.length; j++) {
- td = document.createElement("td");
- $(td).html(jobValues[j]);
- $(tr).append(td);
- }
- body.append(tr);
+ // add remaining jobs for this query
+ for (j = 1; j < jobs.length; j++) {
+ jobValues = jobs[j];
+ tr = document.createElement("tr");
+
+ // add empty cell for query column
+ td = document.createElement("td");
+ $(td).html(" ");
+ $(tr).append(td);
- // add remaining jobs for this query
- for (j = 1; j < jobs.length; j++) {
- jobValues = jobs[j];
- tr = document.createElement("tr");
-
- // add empty cell for query column
+ // add job values
+ for (var k = 0; k < jobValues.length; k++) {
td = document.createElement("td");
- $(td).html(" ");
+ $(td).html(jobValues[k]);
$(tr).append(td);
-
- // add job values
- for (var k = 0; k < jobValues.length; k++) {
- td = document.createElement("td");
- $(td).html(jobValues[k]);
- $(tr).append(td);
- }
- body.append(tr);
}
-
- } else {
- body.append(tr);
+ newBody.append(tr);
}
+
+ } else {
+ newBody.append(tr);
}
- } else {
- $("#hiveTable").hide();
}
+ oldBody.replaceWith(newBody);
+
+ $("#hiveJobSummary").show();
}
});
@@ -277,9 +214,6 @@ azkaban.JobTabView = Backbone.View.extend({
}
},
- render: function() {
- },
-
handleJobLogViewLinkClick: function() {
$('#jobSummaryViewLink').removeClass('active');
$('#jobSummaryView').hide();
@@ -312,23 +246,24 @@ var showDialog = function(title, message) {
}
$(function() {
- var selected;
- logModel = new azkaban.LogModel();
+ var logDataModel = new azkaban.LogDataModel();
+
jobLogView = new azkaban.JobLogView({
el: $('#jobLogView'),
- model: logModel
+ model: logDataModel
});
- summaryModel = new azkaban.SummaryModel();
jobSummaryView = new azkaban.JobSummaryView({
el: $('#jobSummaryView'),
- model: summaryModel
+ model: logDataModel
});
jobTabView = new azkaban.JobTabView({
el: $('#headertabs')
});
+ logDataModel.refresh();
+
if (window.location.hash) {
var hash = window.location.hash;
if (hash == '#joblog') {
src/web/js/azkaban.logdata.model.js 302(+302 -0)
diff --git a/src/web/js/azkaban.logdata.model.js b/src/web/js/azkaban.logdata.model.js
new file mode 100644
index 0000000..2e21386
--- /dev/null
+++ b/src/web/js/azkaban.logdata.model.js
@@ -0,0 +1,302 @@
+$.namespace('azkaban');
+
+azkaban.LogDataModel = Backbone.Model.extend({
+ TIMESTAMP_REGEX: /^.*? - /gm,
+
+ JOB_TRACKER_URL_REGEX: /https?:\/\/[-\w\.]+(?::\d+)?\/[\w\/\.]*\?\S+(job_\d{12}_\d{4,})\S*/,
+
+ // Command properties
+ COMMAND_START: "Command: ",
+ CLASSPATH_REGEX: /(?:-cp|-classpath)\s+(\S+)/g,
+ ENVIRONMENT_VARIABLES_REGEX: /-D(\S+)/g,
+ JVM_MEMORY_REGEX: /(-Xm\S+)/g,
+ PIG_PARAMS_REGEX: /-param\s+(\S+)/g,
+
+ JOB_TYPE_REGEX: /Building (\S+) job executor/,
+
+ PIG_JOB_SUMMARY_START: "HadoopVersion",
+ PIG_JOB_STATS_START: "Job Stats (time in seconds):",
+
+ HIVE_PARSING_START: "Parsing command: ",
+ HIVE_PARSING_END: "Parse Completed",
+ HIVE_NUM_MAP_REDUCE_JOBS_STRING: "Total MapReduce jobs = ",
+ HIVE_MAP_REDUCE_JOB_START: "Starting Job",
+ HIVE_MAP_REDUCE_JOBS_SUMMARY: "MapReduce Jobs Launched:",
+ HIVE_MAP_REDUCE_SUMMARY_REGEX: /Job (\d+): Map: (\d+) Reduce: (\d+) HDFS Read: (\d+) HDFS Write: (\d+)/,
+
+ initialize: function() {
+ this.set("offset", 0 );
+ this.set("logData", "");
+ this.on("change:logData", this.parseLogData);
+ },
+
+ refresh: function() {
+ var requestURL = contextURL + "/executor";
+ var finished = false;
+
+ var date = new Date();
+ var startTime = date.getTime();
+
+ while (!finished) {
+ var requestData = {
+ "execid": execId,
+ "jobId": jobId,
+ "ajax":"fetchExecJobLogs",
+ "offset": this.get("offset"),
+ "length": 50000,
+ "attempt": attempt
+ };
+
+ var self = this;
+
+ var successHandler = function(data) {
+ console.log("fetchLogs");
+ if (data.error) {
+ console.log(data.error);
+ finished = true;
+ }
+ else if (data.length == 0) {
+ finished = true;
+ }
+ else {
+ var date = new Date();
+ var endTime = date.getTime();
+ if ((endTime - startTime) > 10000) {
+ finished = true;
+ showDialog("Alert","The log is taking a long time to finish loading. Azkaban has stopped loading them. Please click Refresh to restart the load.");
+ }
+
+ self.set("offset", data.offset + data.length);
+ self.set("logData", self.get("logData") + data.data);
+ }
+ }
+
+ $.ajax({
+ url: requestURL,
+ type: "get",
+ async: false,
+ data: requestData,
+ dataType: "json",
+ error: function(data) {
+ console.log(data);
+ finished = true;
+ },
+ success: successHandler
+ });
+ }
+ },
+
+ parseLogData: function() {
+ var data = this.get("logData").replace(this.TIMESTAMP_REGEX, "");
+ var lines = data.split("\n");
+
+ if (this.parseCommand(lines)) {
+ this.parseJobTrackerUrls(lines);
+
+ var jobType = this.parseJobType(lines);
+ if (jobType.indexOf("pig") !== -1) {
+ this.parsePigTable(lines, "pigSummary", this.PIG_JOB_SUMMARY_START, "", 0);
+ this.parsePigTable(lines, "pigStats", this.PIG_JOB_STATS_START, "", 1);
+ } else if (jobType.indexOf("hive") !== -1) {
+ this.parseHiveQueries(lines);
+ }
+ }
+ },
+
+ parseCommand: function(lines) {
+ var commandStartIndex = -1;
+ var numLines = lines.length;
+ for (var i = 0; i < numLines; i++) {
+ if (lines[i].indexOf(this.COMMAND_START) === 0) {
+ commandStartIndex = i;
+ break;
+ }
+ }
+
+ if (commandStartIndex != -1) {
+ var commandProperties = {};
+
+ var command = lines[commandStartIndex].substring(this.COMMAND_START.length);
+ commandProperties.Command = command;
+
+ this.parseCommandProperty(command, commandProperties, "Classpath", this.CLASSPATH_REGEX, ':');
+ this.parseCommandProperty(command, commandProperties, "-D", this.ENVIRONMENT_VARIABLES_REGEX);
+ this.parseCommandProperty(command, commandProperties, "Memory Settings", this.JVM_MEMORY_REGEX);
+ this.parseCommandProperty(command, commandProperties, "Params", this.PIG_PARAMS_REGEX);
+
+ this.set("commandProperties", commandProperties);
+
+ return true;
+ }
+
+ return false;
+ },
+
+ parseCommandProperty: function(command, commandProperties, propertyName, regex, split) {
+ var results = [];
+ var match;
+ while (match = regex.exec(command)) {
+ if (split) {
+ results = results.concat(match[1].split(split));
+ } else {
+ results.push(match[1]);
+ }
+ }
+
+ if (results.length > 0) {
+ commandProperties[propertyName] = results;
+ }
+ },
+
+ parseJobTrackerUrls: function(lines) {
+ var jobTrackerUrls = {};
+ var jobTrackerUrlsOrdered = [];
+ var numLines = lines.length;
+ var match;
+ for (var i = 0; i < numLines; i++) {
+ if ((match = this.JOB_TRACKER_URL_REGEX.exec(lines[i])) && !jobTrackerUrls[match[1]]) {
+ jobTrackerUrls[match[1]] = match[0];
+ jobTrackerUrlsOrdered.push(match[0]);
+ }
+ }
+ this.set("jobTrackerUrls", jobTrackerUrls);
+ this.set("jobTrackerUrlsOrdered", jobTrackerUrlsOrdered);
+ },
+
+ parseJobType: function(lines) {
+ var numLines = lines.length;
+ var match;
+ for (var i = 0; numLines; i++) {
+ if (match = this.JOB_TYPE_REGEX.exec(lines[i])) {
+ return match[1];
+ }
+ }
+
+ return null;
+ },
+
+ parsePigTable: function(lines, tableName, startPattern, endPattern, linesToSkipAfterStart) {
+ var index = -1;
+ var numLines = lines.length;
+ for (var i = 0; i < numLines; i++) {
+ if (lines[i].indexOf(startPattern) === 0) {
+ index = i + linesToSkipAfterStart;
+ break;
+ }
+ }
+
+ if (index != -1) {
+ var table = [];
+ var line;
+ while ((line = lines[index]) !== endPattern) {
+ var columns = line.split("\t");
+ // If first column is a job id, make it a link to the job tracker.
+ if (this.get("jobTrackerUrls")[columns[0]]) {
+ columns[0] = "<a href='" + this.get("jobTrackerUrls")[columns[0]] + "'>" + columns[0] + "</a>";
+ }
+ table.push(columns);
+ index++;
+ }
+
+ this.set(tableName, table);
+ }
+ },
+
+ parseHiveQueries: function(lines) {
+ var hiveQueries = [];
+ var hiveQueryJobs = [];
+
+ var currMapReduceJob = 0;
+ var numLines = lines.length;
+ for (var i = 0; i < numLines;) {
+ var line = lines[i];
+ var parsingCommandIndex = line.indexOf(this.HIVE_PARSING_START);
+ if (parsingCommandIndex === -1) {
+ i++;
+ continue;
+ }
+
+ // parse query text, which could span multiple lines
+ var queryStartIndex = parsingCommandIndex + this.HIVE_PARSING_START.length;
+ var query = line.substring(queryStartIndex) + "<br/>";
+
+ i++;
+ while (i < numLines && (line = lines[i]).indexOf(this.HIVE_PARSING_END) === -1) {
+ query += line + "<br/>";
+ i++;
+ }
+ hiveQueries.push(query);
+ i++;
+
+ // parse the query's Map-Reduce jobs, if any.
+ var numMRJobs = 0;
+ while (i < numLines) {
+ line = lines[i];
+ if (line.contains(this.HIVE_NUM_MAP_REDUCE_JOBS_STRING)) {
+ // query involves map reduce jobs
+ var numMRJobs = parseInt(line.substring(this.HIVE_NUM_MAP_REDUCE_JOBS_STRING.length),10);
+ i++;
+
+ // get the map reduce jobs summary
+ while (i < numLines) {
+ line = lines[i];
+ if (line.contains(this.HIVE_MAP_REDUCE_JOBS_SUMMARY)) {
+ // job summary table found
+ i++;
+
+ var queryJobs = [];
+
+ var previousJob = -1;
+ var numJobsSeen = 0;
+ while (numJobsSeen < numMRJobs && i < numLines) {
+ line = lines[i];
+ var match;
+ if (match = this.HIVE_MAP_REDUCE_SUMMARY_REGEX.exec(line)) {
+ var currJob = parseInt(match[1], 10);
+ if (currJob === previousJob) {
+ i++;
+ continue;
+ }
+
+ var job = [];
+ job.push("<a href='" + this.get("jobTrackerUrlsOrdered")[currMapReduceJob++] + "'>" + currJob + "</a>");
+ job.push(match[2]);
+ job.push(match[3]);
+ job.push(match[4]);
+ job.push(match[5]);
+ queryJobs.push(job);
+ previousJob = currJob;
+ numJobsSeen++;
+ }
+ i++;
+ }
+
+ if (numJobsSeen === numMRJobs) {
+ hiveQueryJobs.push(queryJobs);
+ }
+
+ break;
+ }
+ i++;
+ }
+ break;
+ }
+ else if (line.contains(this.HIVE_PARSING_START)) {
+ if (numMRJobs === 0) {
+ hiveQueryJobs.push(null);
+ }
+ break;
+ }
+ i++;
+ }
+ continue;
+ }
+
+ if (hiveQueries.length > 0) {
+ this.set("hiveSummary", {
+ hiveQueries: hiveQueries,
+ hiveQueryJobs: hiveQueryJobs
+ });
+ }
+ }
+});