azkaban-memoizeit

Resolve merge conflicts.

1/13/2014 12:40:43 AM

Changes

scheduleTriggerMigration/file2Trigger/file2Trigger 5(+0 -5)

scheduleTriggerMigration/file2Trigger/lib/azkaban-2.2.jar 0(+0 -0)

scheduleTriggerMigration/file2Trigger/lib/commons-dbcp-1.4.jar 0(+0 -0)

scheduleTriggerMigration/file2Trigger/lib/commons-dbutils-1.5.jar 0(+0 -0)

scheduleTriggerMigration/file2Trigger/lib/commons-jexl-2.1.1.jar 0(+0 -0)

scheduleTriggerMigration/file2Trigger/lib/commons-logging-1.1.1.jar 0(+0 -0)

scheduleTriggerMigration/file2Trigger/lib/commons-pool-1.6.jar 0(+0 -0)

scheduleTriggerMigration/file2Trigger/lib/file2trigger.jar 0(+0 -0)

scheduleTriggerMigration/file2Trigger/lib/jackson-core-asl-1.9.5.jar 0(+0 -0)

scheduleTriggerMigration/file2Trigger/lib/jackson-mapper-asl-1.9.5.jar 0(+0 -0)

scheduleTriggerMigration/file2Trigger/lib/joda-time-2.0.jar 0(+0 -0)

scheduleTriggerMigration/file2Trigger/lib/log4j-1.2.16.jar 0(+0 -0)

scheduleTriggerMigration/schedule2File/lib/azkaban-2.1.jar 0(+0 -0)

scheduleTriggerMigration/schedule2File/lib/commons-dbcp-1.4.jar 0(+0 -0)

scheduleTriggerMigration/schedule2File/lib/commons-dbutils-1.5.jar 0(+0 -0)

scheduleTriggerMigration/schedule2File/lib/commons-io-2.4.jar 0(+0 -0)

scheduleTriggerMigration/schedule2File/lib/commons-pool-1.6.jar 0(+0 -0)

scheduleTriggerMigration/schedule2File/lib/jackson-core-asl-1.9.5.jar 0(+0 -0)

scheduleTriggerMigration/schedule2File/lib/jackson-mapper-asl-1.9.5.jar 0(+0 -0)

scheduleTriggerMigration/schedule2File/lib/joda-time-2.0.jar 0(+0 -0)

scheduleTriggerMigration/schedule2File/lib/log4j-1.2.16.jar 0(+0 -0)

scheduleTriggerMigration/schedule2File/lib/schedule2file.jar 0(+0 -0)

scheduleTriggerMigration/schedule2File/schedule2file 7(+0 -7)

src/java/azkaban/utils/LogSummary.java 349(+0 -349)

Details

diff --git a/src/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java b/src/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
index 54179fd..2e5de64 100644
--- a/src/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
+++ b/src/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
@@ -162,6 +162,13 @@ public class AzkabanProcess {
 	public void hardKill() {
 		checkStarted();
 		if (isRunning()) {
+			if (processId != 0 ) {
+				try {
+					Runtime.getRuntime().exec("kill -9 " + processId);
+				} catch (IOException e) {
+					logger.error("Kill attempt failed.", e);
+				}
+			}
 			process.destroy();
 		}
 	}
diff --git a/src/java/azkaban/migration/schedule2trigger/CommonParams.java b/src/java/azkaban/migration/schedule2trigger/CommonParams.java
new file mode 100644
index 0000000..0408f51
--- /dev/null
+++ b/src/java/azkaban/migration/schedule2trigger/CommonParams.java
@@ -0,0 +1,22 @@
+package azkaban.migration.schedule2trigger;
+
+public class CommonParams {
+	public static final String TYPE_FLOW_FINISH = "FlowFinish";
+	public static final String TYPE_FLOW_SUCCEED = "FlowSucceed";
+	public static final String TYPE_FLOW_PROGRESS = "FlowProgress";
+
+	public static final String TYPE_JOB_FINISH = "JobFinish";
+	public static final String TYPE_JOB_SUCCEED = "JobSucceed";
+	public static final String TYPE_JOB_PROGRESS = "JobProgress";
+
+	public static final String INFO_DURATION = "Duration";
+	public static final String INFO_FLOW_NAME = "FlowName";
+	public static final String INFO_JOB_NAME = "JobName";
+	public static final String INFO_PROGRESS_PERCENT = "ProgressPercent";
+	public static final String INFO_EMAIL_LIST = "EmailList";
+
+	// always alert
+	public static final String ALERT_TYPE = "SlaAlertType";
+	public static final String ACTION_CANCEL_FLOW = "SlaCancelFlow";
+	public static final String ACTION_ALERT = "SlaAlert";
+}
diff --git a/src/java/azkaban/migration/schedule2trigger/Schedule2Trigger.java b/src/java/azkaban/migration/schedule2trigger/Schedule2Trigger.java
new file mode 100644
index 0000000..6d1bd39
--- /dev/null
+++ b/src/java/azkaban/migration/schedule2trigger/Schedule2Trigger.java
@@ -0,0 +1,256 @@
+package azkaban.migration.schedule2trigger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import azkaban.executor.ExecutionOptions;
+import static azkaban.migration.schedule2trigger.CommonParams.*;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.ReadablePeriod;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.JdbcTriggerLoader;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.builtin.BasicTimeChecker;
+import azkaban.trigger.builtin.ExecuteFlowAction;
+import azkaban.utils.Utils;
+
+public class Schedule2Trigger {
+	
+	private static final Logger logger = Logger.getLogger(Schedule2Trigger.class);
+	private static Props props;
+	private static File outputDir;
+	
+	public static void main(String[] args) throws Exception{
+		if(args.length < 1) {
+			printUsage();
+		}
+		
+		File confFile = new File(args[0]);
+		try {
+			logger.info("Trying to load config from " + confFile.getAbsolutePath());
+			props = loadAzkabanConfig(confFile);
+		} catch (Exception e) {
+			e.printStackTrace();
+			logger.error(e);
+			return;
+		}
+		
+		try {
+			outputDir = File.createTempFile("schedules", null);
+			logger.info("Creating temp dir for dumping existing schedules.");
+			outputDir.delete();
+			outputDir.mkdir();
+		} catch (Exception e) {
+			e.printStackTrace();
+			logger.error(e);
+			return;
+		}
+
+		try {
+			schedule2File();
+		} catch (Exception e) {
+			e.printStackTrace();
+			logger.error(e);
+			return;
+		}
+		
+		try {
+			file2ScheduleTrigger();
+		} catch (Exception e) {
+			e.printStackTrace();
+			logger.error(e);
+			return;
+		}
+		
+		logger.info("Uploaded all schedules. Removing temp dir.");
+		FileUtils.deleteDirectory(outputDir);
+		System.exit(0);
+	}
+	
+	private static Props loadAzkabanConfig(File confFile) throws IOException {
+		return new Props(null, confFile);
+	}
+	
+	private static void printUsage() {
+		System.out.println("Usage: schedule2Trigger PATH_TO_CONFIG_FILE");
+	}
+	
+	private static void schedule2File() throws Exception {
+		azkaban.migration.scheduler.ScheduleLoader scheduleLoader = new azkaban.migration.scheduler.JdbcScheduleLoader(props);
+		logger.info("Loading old schedule info from DB.");
+		List<azkaban.migration.scheduler.Schedule> schedules = scheduleLoader.loadSchedules();
+		for(azkaban.migration.scheduler.Schedule sched : schedules) {
+			writeScheduleFile(sched, outputDir);
+		}
+	}
+	
+	private static void writeScheduleFile(azkaban.migration.scheduler.Schedule sched, File outputDir) throws IOException {
+		String scheduleFileName = sched.getProjectName()+"-"+sched.getFlowName();
+		File outputFile = new File(outputDir, scheduleFileName);
+		outputFile.createNewFile();
+		Props props = new Props();
+		props.put("flowName", sched.getFlowName());
+		props.put("projectName", sched.getProjectName());
+		props.put("projectId", String.valueOf(sched.getProjectId()));
+		props.put("period", azkaban.migration.scheduler.Schedule.createPeriodString(sched.getPeriod()));
+		props.put("firstScheduleTimeLong", sched.getFirstSchedTime());
+		props.put("timezone", sched.getTimezone().getID());
+		props.put("submitUser", sched.getSubmitUser());
+		props.put("submitTimeLong", sched.getSubmitTime());
+		props.put("nextExecTimeLong", sched.getNextExecTime());
+		
+		ExecutionOptions executionOptions = sched.getExecutionOptions();
+		if(executionOptions != null) {
+			props.put("executionOptionsObj", JSONUtils.toJSON(executionOptions.toObject()));
+		}
+		
+		azkaban.migration.sla.SlaOptions slaOptions = sched.getSlaOptions();
+		if(slaOptions != null) {
+			
+			List<Map<String, Object>> settingsObj = new ArrayList<Map<String,Object>>();
+			List<azkaban.migration.sla.SLA.SlaSetting> settings = slaOptions.getSettings();
+			for(azkaban.migration.sla.SLA.SlaSetting set : settings) {
+				Map<String, Object> setObj = new HashMap<String, Object>();
+				String setId = set.getId();
+				azkaban.migration.sla.SLA.SlaRule rule = set.getRule();
+				Map<String, Object> info = new HashMap<String, Object>();
+				info.put(INFO_DURATION, azkaban.migration.scheduler.Schedule.createPeriodString(set.getDuration()));
+				info.put(INFO_EMAIL_LIST, slaOptions.getSlaEmails());
+				List<String> actionsList = new ArrayList<String>();
+				for(azkaban.migration.sla.SLA.SlaAction act : set.getActions()) {
+					if(act.equals(azkaban.migration.sla.SLA.SlaAction.EMAIL)) {
+						actionsList.add(ACTION_ALERT);
+						info.put(ALERT_TYPE, "email");
+					} else if(act.equals(azkaban.migration.sla.SLA.SlaAction.KILL)) {
+						actionsList.add(ACTION_CANCEL_FLOW);
+					}
+				}
+				setObj.put("actions", actionsList);
+				if(setId.equals("")) {
+					info.put(INFO_FLOW_NAME, sched.getFlowName());
+					if(rule.equals(azkaban.migration.sla.SLA.SlaRule.FINISH)) {
+						setObj.put("type", TYPE_FLOW_FINISH);
+					} else if(rule.equals(azkaban.migration.sla.SLA.SlaRule.SUCCESS)) {
+						setObj.put("type", TYPE_FLOW_SUCCEED);
+					}
+				} else {
+					info.put(INFO_JOB_NAME, setId);
+					if(rule.equals(azkaban.migration.sla.SLA.SlaRule.FINISH)) {
+						setObj.put("type", TYPE_JOB_FINISH);
+					} else if(rule.equals(azkaban.migration.sla.SLA.SlaRule.SUCCESS)) {
+						setObj.put("type", TYPE_JOB_SUCCEED);
+					}
+				}
+				setObj.put("info", info);
+				settingsObj.add(setObj);
+			}
+			
+			props.put("slaOptionsObj", JSONUtils.toJSON(settingsObj));
+		}
+		props.storeLocal(outputFile);
+	}
+
+	private static void file2ScheduleTrigger() throws Exception {
+		
+		TriggerLoader triggerLoader = new JdbcTriggerLoader(props);
+		for(File scheduleFile : outputDir.listFiles()) {
+			logger.info("Trying to load schedule from " + scheduleFile.getAbsolutePath());
+			if(scheduleFile.isFile()) {
+				Props schedProps = new Props(null, scheduleFile);
+				String flowName = schedProps.getString("flowName");
+				String projectName = schedProps.getString("projectName");
+				int projectId = schedProps.getInt("projectId");
+				long firstSchedTimeLong = schedProps.getLong("firstScheduleTimeLong");
+//				DateTime firstSchedTime = new DateTime(firstSchedTimeLong);
+				String timezoneId = schedProps.getString("timezone");
+				DateTimeZone timezone = DateTimeZone.forID(timezoneId);
+				ReadablePeriod period = Utils.parsePeriodString(schedProps.getString("period"));
+//				DateTime lastModifyTime = DateTime.now();
+				long nextExecTimeLong = schedProps.getLong("nextExecTimeLong");
+//				DateTime nextExecTime = new DateTime(nextExecTimeLong);
+				long submitTimeLong = schedProps.getLong("submitTimeLong");
+//				DateTime submitTime = new DateTime(submitTimeLong);
+				String submitUser = schedProps.getString("submitUser");
+				ExecutionOptions executionOptions = null;
+				if(schedProps.containsKey("executionOptionsObj")) {
+					String executionOptionsObj = schedProps.getString("executionOptionsObj");
+					executionOptions = ExecutionOptions.createFromObject(JSONUtils.parseJSONFromString(executionOptionsObj));
+				} else {
+					executionOptions = new ExecutionOptions();
+				}
+				List<azkaban.sla.SlaOption> slaOptions = null;
+				if(schedProps.containsKey("slaOptionsObj")) {
+					slaOptions = new ArrayList<azkaban.sla.SlaOption>();
+					List<Map<String, Object>> settingsObj = (List<Map<String, Object>>) JSONUtils.parseJSONFromString(schedProps.getString("slaOptionsObj"));
+					for(Map<String, Object> sla : settingsObj) {
+						String type = (String) sla.get("type");
+						Map<String, Object> info = (Map<String, Object>) sla.get("info");
+						List<String> actions = (List<String>) sla.get("actions");
+						azkaban.sla.SlaOption slaOption = new azkaban.sla.SlaOption(type, actions, info);
+						slaOptions.add(slaOption);
+					}
+				}
+				
+				azkaban.scheduler.Schedule schedule = new azkaban.scheduler.Schedule(-1, projectId, projectName, flowName, "ready", firstSchedTimeLong, timezone, period, DateTime.now().getMillis(), nextExecTimeLong, submitTimeLong, submitUser, executionOptions, slaOptions);
+				Trigger t = scheduleToTrigger(schedule);
+				logger.info("Ready to insert trigger " + t.getDescription());
+				triggerLoader.addTrigger(t);
+				
+			}
+			
+		}
+	}
+	
+	
+	private static Trigger scheduleToTrigger(azkaban.scheduler.Schedule s) {
+		
+		Condition triggerCondition = createTimeTriggerCondition(s);
+		Condition expireCondition = createTimeExpireCondition(s);
+		List<TriggerAction> actions = createActions(s);
+		Trigger t = new Trigger(s.getScheduleId(), s.getLastModifyTime(), s.getSubmitTime(), s.getSubmitUser(), azkaban.scheduler.ScheduleManager.triggerSource, triggerCondition, expireCondition, actions);
+		if(s.isRecurring()) {
+			t.setResetOnTrigger(true);
+		}
+		return t;
+	}
+	
+	private static List<TriggerAction> createActions (azkaban.scheduler.Schedule s) {
+		List<TriggerAction> actions = new ArrayList<TriggerAction>();
+		ExecuteFlowAction executeAct = new ExecuteFlowAction("executeFlowAction", s.getProjectId(), s.getProjectName(), s.getFlowName(), s.getSubmitUser(), s.getExecutionOptions(), s.getSlaOptions());
+		actions.add(executeAct);
+		
+		return actions;
+	}
+	
+	private static Condition createTimeTriggerCondition (azkaban.scheduler.Schedule s) {
+		Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+		ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_1", s.getFirstSchedTime(), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
+		checkers.put(checker.getId(), checker);
+		String expr = checker.getId() + ".eval()";
+		Condition cond = new Condition(checkers, expr);
+		return cond;
+	}
+	
+	// if failed to trigger, auto expire?
+	private static Condition createTimeExpireCondition (azkaban.scheduler.Schedule s) {
+		Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+		ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_2", s.getFirstSchedTime(), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
+		checkers.put(checker.getId(), checker);
+		String expr = checker.getId() + ".eval()";
+		Condition cond = new Condition(checkers, expr);
+		return cond;
+	}
+
+}
diff --git a/src/java/azkaban/migration/scheduler/JdbcScheduleLoader.java b/src/java/azkaban/migration/scheduler/JdbcScheduleLoader.java
new file mode 100644
index 0000000..bcef168
--- /dev/null
+++ b/src/java/azkaban/migration/scheduler/JdbcScheduleLoader.java
@@ -0,0 +1,364 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.migration.scheduler;
+
+
+import azkaban.database.DataSourceUtils;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import javax.sql.DataSource;
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTimeZone;
+import org.joda.time.ReadablePeriod;
+
+@Deprecated
+public class JdbcScheduleLoader implements ScheduleLoader {
+
+	private static Logger logger = Logger.getLogger(JdbcScheduleLoader.class);
+	
+	public static enum EncodingType {
+		PLAIN(1), GZIP(2);
+
+		private int numVal;
+
+		EncodingType(int numVal) {
+			this.numVal = numVal;
+		}
+
+		public int getNumVal() {
+			return numVal;
+		}
+
+		public static EncodingType fromInteger(int x) {
+			switch (x) {
+			case 1:
+				return PLAIN;
+			case 2:
+				return GZIP;
+			default:
+				return PLAIN;
+			}
+		}
+	}
+	
+	private DataSource dataSource;
+	private EncodingType defaultEncodingType = EncodingType.GZIP;
+	
+	private static final String scheduleTableName = "schedules";
+
+	private static String SELECT_ALL_SCHEDULES =
+			"SELECT project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user, enc_type, schedule_options FROM " + scheduleTableName;
+	
+	private static String INSERT_SCHEDULE = 
+			"INSERT INTO " + scheduleTableName + " ( project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user, enc_type, schedule_options) values (?,?,?,?,?,?,?,?,?,?,?,?,?)";
+	
+	private static String REMOVE_SCHEDULE_BY_KEY = 
+			"DELETE FROM " + scheduleTableName + " WHERE project_id=? AND flow_name=?";
+	
+	private static String UPDATE_SCHEDULE_BY_KEY = 
+			"UPDATE " + scheduleTableName + " SET status=?, first_sched_time=?, timezone=?, period=?, last_modify_time=?, next_exec_time=?, submit_time=?, submit_user=?, enc_type=?, schedule_options=? WHERE project_id=? AND flow_name=?";
+	
+	private static String UPDATE_NEXT_EXEC_TIME = 
+			"UPDATE " + scheduleTableName + " SET next_exec_time=? WHERE project_id=? AND flow_name=?";
+
+	private Connection getConnection() throws ScheduleManagerException {
+		Connection connection = null;
+		try {
+			connection = dataSource.getConnection();
+		} catch (Exception e) {
+			DbUtils.closeQuietly(connection);
+			throw new ScheduleManagerException("Error getting DB connection.", e);
+		}
+		
+		return connection;
+	}
+	
+	public EncodingType getDefaultEncodingType() {
+		return defaultEncodingType;
+	}
+
+	public void setDefaultEncodingType(EncodingType defaultEncodingType) {
+		this.defaultEncodingType = defaultEncodingType;
+	}
+	
+	public JdbcScheduleLoader(Props props) {
+		String databaseType = props.getString("database.type");
+		
+		if (databaseType.equals("mysql")) {
+			int port = props.getInt("mysql.port");
+			String host = props.getString("mysql.host");
+			String database = props.getString("mysql.database");
+			String user = props.getString("mysql.user");
+			String password = props.getString("mysql.password");
+			int numConnections = props.getInt("mysql.numconnections");
+			
+			dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
+		}
+	}
+
+	@Override
+	public List<Schedule> loadSchedules() throws ScheduleManagerException {
+		logger.info("Loading all schedules from db.");
+		Connection connection = getConnection();
+
+		QueryRunner runner = new QueryRunner();
+		ResultSetHandler<List<Schedule>> handler = new ScheduleResultHandler();
+	
+		List<Schedule> schedules;
+		
+		try {
+			schedules = runner.query(connection, SELECT_ALL_SCHEDULES, handler);
+		} catch (SQLException e) {
+			logger.error(SELECT_ALL_SCHEDULES + " failed.");
+
+			DbUtils.closeQuietly(connection);
+			throw new ScheduleManagerException("Loading schedules from db failed. ", e);
+		} finally {
+			DbUtils.closeQuietly(connection);
+		}
+		
+		logger.info("Now trying to update the schedules");
+		
+		// filter the schedules
+        Iterator<Schedule> scheduleIterator = schedules.iterator();
+        while (scheduleIterator.hasNext()) {
+            Schedule sched = scheduleIterator.next();
+			if(!sched.updateTime()) {
+				logger.info("Schedule " + sched.getScheduleName() + " was scheduled before azkaban start, skipping it.");
+				scheduleIterator.remove();
+				removeSchedule(sched);
+			}
+			else {
+				logger.info("Recurring schedule, need to update next exec time");
+				try {
+					updateNextExecTime(sched);
+				} catch (Exception e) {
+					e.printStackTrace();
+					throw new ScheduleManagerException("Update next execution time failed.", e);
+				} 
+				logger.info("Schedule " + sched.getScheduleName() + " loaded and updated.");
+			}
+		}
+		
+		
+				
+		logger.info("Loaded " + schedules.size() + " schedules.");
+		
+		return schedules;
+	}
+
+	@Override
+	public void removeSchedule(Schedule s) throws ScheduleManagerException {		
+		logger.info("Removing schedule " + s.getScheduleName() + " from db.");
+
+		QueryRunner runner = new QueryRunner(dataSource);
+	
+		try {
+			int removes =  runner.update(REMOVE_SCHEDULE_BY_KEY, s.getProjectId(), s.getFlowName());
+			if (removes == 0) {
+				throw new ScheduleManagerException("No schedule has been removed.");
+			}
+		} catch (SQLException e) {
+			logger.error(REMOVE_SCHEDULE_BY_KEY + " failed.");
+			throw new ScheduleManagerException("Remove schedule " + s.getScheduleName() + " from db failed. ", e);
+		}
+	}
+	
+	
+	public void insertSchedule(Schedule s) throws ScheduleManagerException {
+		logger.info("Inserting schedule " + s.getScheduleName() + " into db.");
+		insertSchedule(s, defaultEncodingType);
+	}
+
+	public void insertSchedule(Schedule s, EncodingType encType) throws ScheduleManagerException {
+		
+		String json = JSONUtils.toJSON(s.optionsToObject());
+		byte[] data = null;
+		try {
+			byte[] stringData = json.getBytes("UTF-8");
+			data = stringData;
+	
+			if (encType == EncodingType.GZIP) {
+				data = GZIPUtils.gzipBytes(stringData);
+			}
+			logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+		}
+		catch (IOException e) {
+			throw new ScheduleManagerException("Error encoding the schedule options. " + s.getScheduleName());
+		}
+		
+		QueryRunner runner = new QueryRunner(dataSource);
+		try {
+			int inserts =  runner.update( 
+					INSERT_SCHEDULE, 
+					s.getProjectId(),
+					s.getProjectName(),
+					s.getFlowName(), 
+					s.getStatus(), 
+					s.getFirstSchedTime(), 
+					s.getTimezone().getID(), 
+					Schedule.createPeriodString(s.getPeriod()), 
+					s.getLastModifyTime(), 
+					s.getNextExecTime(), 
+					s.getSubmitTime(), 
+					s.getSubmitUser(),
+					encType.getNumVal(),
+					data);
+			if (inserts == 0) {
+				throw new ScheduleManagerException("No schedule has been inserted.");
+			}
+		} catch (SQLException e) {
+			logger.error(INSERT_SCHEDULE + " failed.");
+			throw new ScheduleManagerException("Insert schedule " + s.getScheduleName() + " into db failed. ", e);
+		}
+	}
+	
+	@Override
+	public void updateNextExecTime(Schedule s) throws ScheduleManagerException 
+	{
+		logger.info("Update schedule " + s.getScheduleName() + " into db. ");
+		Connection connection = getConnection();
+		QueryRunner runner = new QueryRunner();
+		try {
+			
+			runner.update(connection, UPDATE_NEXT_EXEC_TIME, s.getNextExecTime(), s.getProjectId(), s.getFlowName()); 
+		} catch (SQLException e) {
+			e.printStackTrace();
+			logger.error(UPDATE_NEXT_EXEC_TIME + " failed.", e);
+			throw new ScheduleManagerException("Update schedule " + s.getScheduleName() + " into db failed. ", e);
+		} finally {
+			DbUtils.closeQuietly(connection);
+		}
+	}
+	
+	@Override
+	public void updateSchedule(Schedule s) throws ScheduleManagerException {
+		logger.info("Updating schedule " + s.getScheduleName() + " into db.");
+		updateSchedule(s, defaultEncodingType);
+	}
+		
+	public void updateSchedule(Schedule s, EncodingType encType) throws ScheduleManagerException {
+
+		String json = JSONUtils.toJSON(s.optionsToObject());
+		byte[] data = null;
+		try {
+			byte[] stringData = json.getBytes("UTF-8");
+			data = stringData;
+	
+			if (encType == EncodingType.GZIP) {
+				data = GZIPUtils.gzipBytes(stringData);
+			}
+			logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+		}
+		catch (IOException e) {
+			throw new ScheduleManagerException("Error encoding the schedule options " + s.getScheduleName());
+		}
+
+		QueryRunner runner = new QueryRunner(dataSource);
+	
+		try {
+			int updates =  runner.update( 
+					UPDATE_SCHEDULE_BY_KEY, 
+					s.getStatus(), 
+					s.getFirstSchedTime(), 
+					s.getTimezone().getID(), 
+					Schedule.createPeriodString(s.getPeriod()), 
+					s.getLastModifyTime(), 
+					s.getNextExecTime(), 
+					s.getSubmitTime(), 
+					s.getSubmitUser(), 	
+					encType.getNumVal(),
+					data,
+					s.getProjectId(), 
+					s.getFlowName());
+			if (updates == 0) {
+				throw new ScheduleManagerException("No schedule has been updated.");
+			}
+		} catch (SQLException e) {
+			logger.error(UPDATE_SCHEDULE_BY_KEY + " failed.");
+			throw new ScheduleManagerException("Update schedule " + s.getScheduleName() + " into db failed. ", e);
+		}
+	}
+
+	public class ScheduleResultHandler implements ResultSetHandler<List<Schedule>> {
+		@Override
+		public List<Schedule> handle(ResultSet rs) throws SQLException {
+			if (!rs.next()) {
+				return Collections.<Schedule>emptyList();
+			}
+			
+			ArrayList<Schedule> schedules = new ArrayList<Schedule>();
+			do {
+				int projectId = rs.getInt(1);
+				String projectName = rs.getString(2);
+				String flowName = rs.getString(3);
+				String status = rs.getString(4);
+				long firstSchedTime = rs.getLong(5);
+				DateTimeZone timezone = DateTimeZone.forID(rs.getString(6));
+				ReadablePeriod period = Schedule.parsePeriodString(rs.getString(7));
+				long lastModifyTime = rs.getLong(8);
+				long nextExecTime = rs.getLong(9);
+				long submitTime = rs.getLong(10);
+				String submitUser = rs.getString(11);
+				int encodingType = rs.getInt(12);
+				byte[] data = rs.getBytes(13);
+				
+				Object optsObj = null;
+				if (data != null) {
+					EncodingType encType = EncodingType.fromInteger(encodingType);
+
+					try {
+						// Convoluted way to inflate strings. Should find common package or helper function.
+						if (encType == EncodingType.GZIP) {
+							// Decompress the sucker.
+							String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+							optsObj = JSONUtils.parseJSONFromString(jsonString);
+						}
+						else {
+							String jsonString = new String(data, "UTF-8");
+							optsObj = JSONUtils.parseJSONFromString(jsonString);
+						}	
+					} catch (IOException e) {
+						throw new SQLException("Error reconstructing schedule options " + projectName + "." + flowName);
+					}
+				}
+				
+				Schedule s = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser);
+				if (optsObj != null) {
+					s.createAndSetScheduleOptions(optsObj);
+				}
+				
+				schedules.add(s);
+			} while (rs.next());
+			
+			return schedules;
+		}
+		
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/migration/scheduler/Schedule.java b/src/java/azkaban/migration/scheduler/Schedule.java
new file mode 100644
index 0000000..9490243
--- /dev/null
+++ b/src/java/azkaban/migration/scheduler/Schedule.java
@@ -0,0 +1,363 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.migration.scheduler;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
+import org.joda.time.DurationFieldType;
+import org.joda.time.Hours;
+import org.joda.time.Minutes;
+import org.joda.time.Months;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.Seconds;
+import org.joda.time.Weeks;
+
+import azkaban.executor.ExecutionOptions;
+import azkaban.migration.sla.SlaOptions;
+import azkaban.utils.Pair;
+
+@Deprecated
+public class Schedule{
+	
+//	private long projectGuid;
+//	private long flowGuid;
+	
+//	private String scheduleId;
+	
+	private int projectId;
+	private String projectName;
+	private String flowName;
+	private long firstSchedTime;
+	private DateTimeZone timezone;
+	private long lastModifyTime;
+	private ReadablePeriod period;
+	private long nextExecTime;
+	private String submitUser;
+	private String status;
+	private long submitTime;
+	
+	private ExecutionOptions executionOptions;
+	private SlaOptions slaOptions;
+	
+	public Schedule(
+						int projectId,
+						String projectName,
+						String flowName,
+						String status,
+						long firstSchedTime,
+						DateTimeZone timezone,
+						ReadablePeriod period,
+						long lastModifyTime,						
+						long nextExecTime,						
+						long submitTime,
+						String submitUser
+						) {
+		this.projectId = projectId;
+		this.projectName = projectName;
+		this.flowName = flowName;
+		this.firstSchedTime = firstSchedTime;
+		this.timezone = timezone;
+		this.lastModifyTime = lastModifyTime;
+		this.period = period;
+		this.nextExecTime = nextExecTime;
+		this.submitUser = submitUser;
+		this.status = status;
+		this.submitTime = submitTime;
+		this.executionOptions = null;
+		this.slaOptions = null;
+	}
+
+	public Schedule(
+						int projectId,
+						String projectName,
+						String flowName,
+						String status,
+						long firstSchedTime,
+						String timezoneId,
+						String period,
+						long lastModifyTime,						
+						long nextExecTime,						
+						long submitTime,
+						String submitUser,
+						ExecutionOptions executionOptions,
+						SlaOptions slaOptions
+			) {
+		this.projectId = projectId;
+		this.projectName = projectName;
+		this.flowName = flowName;
+		this.firstSchedTime = firstSchedTime;
+		this.timezone = DateTimeZone.forID(timezoneId);
+		this.lastModifyTime = lastModifyTime;
+		this.period = parsePeriodString(period);
+		this.nextExecTime = nextExecTime;
+		this.submitUser = submitUser;
+		this.status = status;
+		this.submitTime = submitTime;
+		this.executionOptions = executionOptions;
+		this.slaOptions = slaOptions;
+	}
+
+	public Schedule(
+						int projectId,
+						String projectName,
+						String flowName,
+						String status,
+						long firstSchedTime,
+						DateTimeZone timezone,
+						ReadablePeriod period,
+						long lastModifyTime,
+						long nextExecTime,
+						long submitTime,
+						String submitUser,
+						ExecutionOptions executionOptions,
+						SlaOptions slaOptions
+						) {
+		this.projectId = projectId;
+		this.projectName = projectName;
+		this.flowName = flowName;
+		this.firstSchedTime = firstSchedTime;
+		this.timezone = timezone;
+		this.lastModifyTime = lastModifyTime;
+		this.period = period;
+		this.nextExecTime = nextExecTime;
+		this.submitUser = submitUser;
+		this.status = status;
+		this.submitTime = submitTime;
+		this.executionOptions = executionOptions;
+		this.slaOptions = slaOptions;
+	}
+
+	public ExecutionOptions getExecutionOptions() {
+		return executionOptions;
+	}
+
+	public void setFlowOptions(ExecutionOptions executionOptions) {
+		this.executionOptions = executionOptions;
+	}
+
+	public SlaOptions getSlaOptions() {
+		return slaOptions;
+	}
+
+	public void setSlaOptions(SlaOptions slaOptions) {
+		this.slaOptions = slaOptions;
+	}
+
+	public String getScheduleName() {
+		return projectName + "." + flowName + " (" + projectId + ")";
+	}
+	
+	public String toString() {
+		return projectName + "." + flowName + " (" + projectId + ")" + " to be run at (starting) " + 
+				new DateTime(firstSchedTime).toDateTimeISO() + " with recurring period of " + (period == null ? "non-recurring" : createPeriodString(period));
+	}
+	
+	public Pair<Integer, String> getScheduleId() {
+		return new Pair<Integer, String>(getProjectId(), getFlowName());
+	}
+	
+	public int getProjectId() {
+		return projectId;
+	}
+
+	public String getProjectName() {
+		return projectName;
+	}
+
+	public String getFlowName() {
+		return flowName;
+	}
+
+	public long getFirstSchedTime() {
+		return firstSchedTime;
+	}
+
+	public DateTimeZone getTimezone() {
+		return timezone;
+	}
+
+	public long getLastModifyTime() {
+		return lastModifyTime;
+	}
+
+	public ReadablePeriod getPeriod() {
+		return period;
+	}
+
+	public long getNextExecTime() {
+		return nextExecTime;
+	}
+
+	public String getSubmitUser() {
+		return submitUser;
+	}
+
+	public String getStatus() {
+		return status;
+	}
+
+	public long getSubmitTime() {
+		return submitTime;
+	}
+
+	public boolean updateTime() {
+		if (new DateTime(nextExecTime).isAfterNow()) {
+			return true;
+		}
+
+		if (period != null) {
+			DateTime nextTime = getNextRuntime(nextExecTime, timezone, period);
+
+			this.nextExecTime = nextTime.getMillis();
+			return true;
+		}
+
+		return false;
+	}
+	
+	private DateTime getNextRuntime(long scheduleTime, DateTimeZone timezone, ReadablePeriod period) {
+		DateTime now = new DateTime();
+		DateTime date = new DateTime(scheduleTime).withZone(timezone);
+		int count = 0;
+		while (!now.isBefore(date)) {
+			if (count > 100000) {
+				throw new IllegalStateException(
+						"100000 increments of period did not get to present time.");
+			}
+
+			if (period == null) {
+				break;
+			} else {
+				date = date.plus(period);
+			}
+
+			count += 1;
+		}
+
+		return date;
+	}
+
+	public static ReadablePeriod parsePeriodString(String periodStr) {
+		ReadablePeriod period;
+		char periodUnit = periodStr.charAt(periodStr.length() - 1);
+		if (periodUnit == 'n') {
+			return null;
+		}
+
+		int periodInt = Integer.parseInt(periodStr.substring(0,
+				periodStr.length() - 1));
+		switch (periodUnit) {
+		case 'M':
+			period = Months.months(periodInt);
+			break;
+		case 'w':
+			period = Weeks.weeks(periodInt);
+			break;
+		case 'd':
+			period = Days.days(periodInt);
+			break;
+		case 'h':
+			period = Hours.hours(periodInt);
+			break;
+		case 'm':
+			period = Minutes.minutes(periodInt);
+			break;
+		case 's':
+			period = Seconds.seconds(periodInt);
+			break;
+		default:
+			throw new IllegalArgumentException("Invalid schedule period unit '"
+					+ periodUnit);
+		}
+
+		return period;
+	}
+
+	public static String createPeriodString(ReadablePeriod period) {
+		String periodStr = "n";
+
+		if (period == null) {
+			return "n";
+		}
+
+		if (period.get(DurationFieldType.months()) > 0) {
+			int months = period.get(DurationFieldType.months());
+			periodStr = months + "M";
+		} else if (period.get(DurationFieldType.weeks()) > 0) {
+			int weeks = period.get(DurationFieldType.weeks());
+			periodStr = weeks + "w";
+		} else if (period.get(DurationFieldType.days()) > 0) {
+			int days = period.get(DurationFieldType.days());
+			periodStr = days + "d";
+		} else if (period.get(DurationFieldType.hours()) > 0) {
+			int hours = period.get(DurationFieldType.hours());
+			periodStr = hours + "h";
+		} else if (period.get(DurationFieldType.minutes()) > 0) {
+			int minutes = period.get(DurationFieldType.minutes());
+			periodStr = minutes + "m";
+		} else if (period.get(DurationFieldType.seconds()) > 0) {
+			int seconds = period.get(DurationFieldType.seconds());
+			periodStr = seconds + "s";
+		}
+
+		return periodStr;
+	}
+	
+	
+	public Map<String,Object> optionsToObject() {
+		if(executionOptions != null || slaOptions != null) {
+			HashMap<String, Object> schedObj = new HashMap<String, Object>();
+			
+			if(executionOptions != null) {
+				schedObj.put("executionOptions", executionOptions.toObject());
+			}
+			if(slaOptions != null) {
+				schedObj.put("slaOptions", slaOptions.toObject());
+			}
+	
+			return schedObj;
+		}
+		return null;
+	}
+	
+	public void createAndSetScheduleOptions(Object obj) {
+		@SuppressWarnings("unchecked")
+		HashMap<String, Object> schedObj = (HashMap<String, Object>)obj;
+		if (schedObj.containsKey("executionOptions")) {
+			ExecutionOptions execOptions = ExecutionOptions.createFromObject(schedObj.get("executionOptions"));
+			this.executionOptions = execOptions;
+		}
+		else if (schedObj.containsKey("flowOptions")){
+			ExecutionOptions execOptions = ExecutionOptions.createFromObject(schedObj.get("flowOptions"));
+			this.executionOptions = execOptions;
+			execOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
+		}
+		else {
+			this.executionOptions = new ExecutionOptions();
+			this.executionOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
+		}
+
+		if (schedObj.containsKey("slaOptions")) {
+			SlaOptions slaOptions = SlaOptions.fromObject(schedObj.get("slaOptions"));
+			this.slaOptions = slaOptions;
+		}
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/migration/scheduler/ScheduleLoader.java b/src/java/azkaban/migration/scheduler/ScheduleLoader.java
new file mode 100644
index 0000000..6511d9c
--- /dev/null
+++ b/src/java/azkaban/migration/scheduler/ScheduleLoader.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.migration.scheduler;
+
+import java.util.List;
+
+@Deprecated
+public interface ScheduleLoader {
+	
+	public void insertSchedule(Schedule s) throws ScheduleManagerException;
+	
+	public void updateSchedule(Schedule s) throws ScheduleManagerException;
+	
+	public List<Schedule> loadSchedules() throws ScheduleManagerException;
+	
+	public void removeSchedule(Schedule s) throws ScheduleManagerException;
+
+	public void updateNextExecTime(Schedule s) throws ScheduleManagerException;
+
+}
\ No newline at end of file
diff --git a/src/java/azkaban/migration/scheduler/ScheduleManagerException.java b/src/java/azkaban/migration/scheduler/ScheduleManagerException.java
new file mode 100644
index 0000000..f0f6705
--- /dev/null
+++ b/src/java/azkaban/migration/scheduler/ScheduleManagerException.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.migration.scheduler;
+
+@Deprecated
+public class ScheduleManagerException extends Exception{
+	private static final long serialVersionUID = 1L;
+
+	public ScheduleManagerException(String message) {
+		super(message);
+	}
+	
+	public ScheduleManagerException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/migration/sla/SLA.java b/src/java/azkaban/migration/sla/SLA.java
new file mode 100644
index 0000000..0e75965
--- /dev/null
+++ b/src/java/azkaban/migration/sla/SLA.java
@@ -0,0 +1,252 @@
+package azkaban.migration.sla;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.joda.time.ReadablePeriod;
+
+import azkaban.migration.scheduler.Schedule;
+
+@Deprecated
+public class SLA {
+
+	public static enum SlaRule {
+		SUCCESS(1), FINISH(2), WAITANDCHECKJOB(3);
+
+		private int numVal;
+
+		SlaRule(int numVal) {
+			this.numVal = numVal;
+		}
+
+		public int getNumVal() {
+			return numVal;
+		}
+
+		public static SlaRule fromInteger(int x) {
+			switch (x) {
+			case 1:
+				return SUCCESS;
+			case 2:
+				return FINISH;
+			case 3:
+				return WAITANDCHECKJOB;
+			default:
+				return SUCCESS;
+			}
+		}
+	}
+	
+	public static enum SlaAction {
+		EMAIL(1), KILL(2);
+
+		private int numVal;
+
+		SlaAction(int numVal) {
+			this.numVal = numVal;
+		}
+
+		public int getNumVal() {
+			return numVal;
+		}
+
+		public static SlaAction fromInteger(int x) {
+			switch (x) {
+			case 1:
+				return EMAIL;
+			case 2:
+				return KILL;
+			default:
+				return EMAIL;
+			}
+		}
+	}
+	
+	public static class SlaSetting {
+		public String getId() {
+			return id;
+		}
+		public void setId(String id) {
+			this.id = id;
+		}
+		public ReadablePeriod getDuration() {
+			return duration;
+		}
+		public void setDuration(ReadablePeriod duration) {
+			this.duration = duration;
+		}
+		public SlaRule getRule() {
+			return rule;
+		}
+		public void setRule(SlaRule rule) {
+			this.rule = rule;
+		}
+		public List<SlaAction> getActions() {
+			return actions;
+		}
+		public void setActions(List<SlaAction> actions) {
+			this.actions = actions;
+		}
+		
+		public Object toObject() {
+			Map<String, Object> obj = new HashMap<String, Object>();
+			obj.put("id", id);
+			obj.put("duration", Schedule.createPeriodString(duration));
+//			List<String> rulesObj = new ArrayList<String>();
+//			for(SlaRule rule : rules) {
+//				rulesObj.add(rule.toString());
+//			}
+//			obj.put("rules", rulesObj);
+			obj.put("rule", rule.toString());
+			List<String> actionsObj = new ArrayList<String>();
+			for(SlaAction act : actions) {
+				actionsObj.add(act.toString());
+			}
+			obj.put("actions", actionsObj);
+			return obj;
+		}
+		
+		@SuppressWarnings("unchecked")
+		public static SlaSetting fromObject(Object obj) {
+			Map<String, Object> slaObj = (HashMap<String, Object>) obj;
+			String subId = (String) slaObj.get("id");
+			ReadablePeriod dur = Schedule.parsePeriodString((String) slaObj.get("duration"));
+//			List<String> rulesObj = (ArrayList<String>) slaObj.get("rules");
+//			List<SlaRule> slaRules = new ArrayList<SLA.SlaRule>();
+//			for(String rule : rulesObj) {
+//				slaRules.add(SlaRule.valueOf(rule));
+//			}
+			SlaRule slaRule = SlaRule.valueOf((String) slaObj.get("rule"));
+			List<String> actsObj = (ArrayList<String>) slaObj.get("actions");
+			List<SlaAction> slaActs = new ArrayList<SlaAction>();
+			for(String act : actsObj) {
+				slaActs.add(SlaAction.valueOf(act));
+			}
+			
+			SlaSetting ret = new SlaSetting();
+			ret.setId(subId);
+			ret.setDuration(dur);
+			ret.setRule(slaRule);
+			ret.setActions(slaActs);
+			return ret;
+		}
+		
+		private String id;
+		private ReadablePeriod duration;
+		private SlaRule rule = SlaRule.SUCCESS;
+		private List<SlaAction> actions;
+	}
+	
+	private int execId;
+	private String jobName;
+	private DateTime checkTime;
+	private List<String> emails;
+	private List<SlaAction> actions;
+	private List<SlaSetting> jobSettings;
+	private SlaRule rule;
+	
+	public SLA(
+			int execId,
+			String jobName,
+			DateTime checkTime,
+			List<String> emails,
+			List<SlaAction> slaActions,
+			List<SlaSetting> jobSettings,
+			SlaRule slaRule
+	) {
+		this.execId = execId;
+		this.jobName = jobName;
+		this.checkTime = checkTime;
+		this.emails = emails;
+		this.actions = slaActions;
+		this.jobSettings = jobSettings;
+		this.rule = slaRule;
+	}
+
+	public int getExecId() {
+		return execId;
+	}
+
+	public String getJobName() {
+		return jobName;
+	}
+
+	public DateTime getCheckTime() {
+		return checkTime;
+	}
+
+	public List<String> getEmails() {
+		return emails;
+	}
+
+	public List<SlaAction> getActions() {
+		return actions;
+	}
+	
+	public List<SlaSetting> getJobSettings() {
+		return jobSettings;
+	}
+
+	public SlaRule getRule() {
+		return rule;
+	}
+	
+	public String toString() {
+		return execId + " " + jobName + " to be checked at " + checkTime.toDateTimeISO();
+	}
+	
+	public Map<String,Object> optionToObject() {
+		HashMap<String, Object> slaObj = new HashMap<String, Object>();
+
+		slaObj.put("emails", emails);
+//		slaObj.put("rule", rule.toString());
+
+		List<String> actionsObj = new ArrayList<String>();
+		for(SlaAction act : actions) {
+			actionsObj.add(act.toString());
+		}
+		slaObj.put("actions", actionsObj);
+		
+		if(jobSettings != null && jobSettings.size() > 0) {
+			List<Object> settingsObj = new ArrayList<Object>();
+			for(SlaSetting set : jobSettings) {
+				settingsObj.add(set.toObject());
+			}
+			slaObj.put("jobSettings", settingsObj);
+		}
+
+		return slaObj;
+	}
+
+	@SuppressWarnings("unchecked")
+	public static SLA createSlaFromObject(int execId, String jobName, DateTime checkTime, SlaRule rule, Object obj) {
+		
+		HashMap<String, Object> slaObj = (HashMap<String,Object>)obj;
+
+		List<String> emails = (List<String>)slaObj.get("emails");
+//		SlaRule rule = SlaRule.valueOf((String)slaObj.get("rule"));
+		List<String> actsObj = (ArrayList<String>) slaObj.get("actions");
+		List<SlaAction> slaActs = new ArrayList<SlaAction>();
+		for(String act : actsObj) {
+			slaActs.add(SlaAction.valueOf(act));
+		}
+		List<SlaSetting> jobSets = null;
+		if(slaObj.containsKey("jobSettings") && slaObj.get("jobSettings") != null) {
+			jobSets = new ArrayList<SLA.SlaSetting>();
+			for(Object set : (List<Object>)slaObj.get("jobSettings")) {
+				SlaSetting jobSet = SlaSetting.fromObject(set);
+				jobSets.add(jobSet);
+			}
+		}
+		
+		return new SLA(execId, jobName, checkTime, emails, slaActs, jobSets, rule);
+	}
+
+	public void setCheckTime(DateTime time) {
+		this.checkTime = time;
+	}
+
+}
diff --git a/src/java/azkaban/migration/sla/SlaOptions.java b/src/java/azkaban/migration/sla/SlaOptions.java
new file mode 100644
index 0000000..f7b9d49
--- /dev/null
+++ b/src/java/azkaban/migration/sla/SlaOptions.java
@@ -0,0 +1,52 @@
+package azkaban.migration.sla;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import azkaban.migration.sla.SLA.SlaSetting;
+
+@Deprecated
+public class SlaOptions {
+
+	public List<String> getSlaEmails() {
+		return slaEmails;
+	}
+	public void setSlaEmails(List<String> slaEmails) {
+		this.slaEmails = slaEmails;
+	}
+	public List<SlaSetting> getSettings() {
+		return settings;
+	}
+	public void setSettings(List<SlaSetting> settings) {
+		this.settings = settings;
+	}
+	private List<String> slaEmails;
+	private List<SlaSetting> settings;
+	public Object toObject() {
+		Map<String, Object> obj = new HashMap<String, Object>();
+		obj.put("slaEmails", slaEmails);
+		List<Object> slaSettings = new ArrayList<Object>();
+		for(SlaSetting s : settings) {
+			slaSettings.add(s.toObject());
+		}
+		obj.put("settings", slaSettings);
+		return obj;
+	}
+	@SuppressWarnings("unchecked")
+	public static SlaOptions fromObject(Object object) {
+		if(object != null) {
+			SlaOptions slaOptions = new SlaOptions();
+			Map<String, Object> obj = (HashMap<String, Object>) object;
+			slaOptions.setSlaEmails((List<String>) obj.get("slaEmails"));
+			List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
+			for(Object set: (List<Object>)obj.get("settings")) {
+				slaSets.add(SlaSetting.fromObject(set));
+			}
+			slaOptions.setSettings(slaSets);
+			return slaOptions;
+		}
+		return null;			
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 51c0dab..a0adf57 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -45,7 +45,6 @@ import azkaban.user.Permission;
 import azkaban.user.User;
 import azkaban.user.Permission.Type;
 import azkaban.utils.FileIOUtils.LogData;
-import azkaban.utils.LogSummary;
 import azkaban.utils.JSONUtils;
 import azkaban.webapp.AzkabanWebServer;
 import azkaban.webapp.session.Session;
@@ -133,12 +132,15 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 				else if (ajaxName.equals("fetchExecJobLogs")) {
 					ajaxFetchJobLogs(req, resp, ret, session.getUser(), exFlow);
 				}
+<<<<<<< HEAD
 				else if (ajaxName.equals("fetchExecJobSummary")) {
 					ajaxFetchJobSummary(req, resp, ret, session.getUser(), exFlow);
 				}
         else if (ajaxName.equals("fetchExecJobStats")) {
           ajaxFetchJobStats(req, resp, ret, session.getUser(), exFlow);
         }
+=======
+>>>>>>> master
 				else if (ajaxName.equals("retryFailedJobs")) {
 					ajaxRestartFailed(req, resp, ret, session.getUser(), exFlow);
 				}
@@ -459,53 +461,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		}
 	}
 	
-	/**
-	 * Gets the job summary.
-	 * 
-	 * @param req
-	 * @param resp
-	 * @param user
-	 * @param exFlow
-	 * @throws ServletException
-	 */
-	private void ajaxFetchJobSummary(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
-		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
-		if (project == null) {
-			return;
-		}
-		
-		String jobId = this.getParam(req, "jobId");
-		resp.setCharacterEncoding("utf-8");
-
-		try {
-			ExecutableNode node = exFlow.getExecutableNode(jobId);
-			if (node == null) {
-				ret.put("error", "Job " + jobId + " doesn't exist in " + exFlow.getExecutionId());
-				return;
-			}
-			
-			int attempt = this.getIntParam(req, "attempt", node.getAttempt());
-			LogData data = executorManager.getExecutionJobLog(exFlow, jobId, 0, Integer.MAX_VALUE, attempt);
-			
-			LogSummary summary = new LogSummary(data);
-			ret.put("commandProperties", summary.getCommandProperties());
-			
-			String jobType = summary.getJobType();
-			
-			if (jobType.contains("pig")) {
-				ret.put("summaryTableHeaders", summary.getPigSummaryTableHeaders());
-				ret.put("summaryTableData", summary.getPigSummaryTableData());
-				ret.put("statTableHeaders", summary.getPigStatTableHeaders());
-				ret.put("statTableData", summary.getPigStatTableData());
-			} else if (jobType.contains("hive")) {
-				ret.put("hiveQueries", summary.getHiveQueries());
-				ret.put("hiveQueryJobs", summary.getHiveQueryJobs());
-			}
-		} catch (ExecutorManagerException e) {
-			throw new ServletException(e);
-		}
-	}
-	
   private void ajaxFetchJobStats(
       HttpServletRequest req, 
       HttpServletResponse resp, 
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm b/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
index 9b1cd51..4f8aa93 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
@@ -22,6 +22,7 @@
 #parse("azkaban/webapp/servlet/velocity/javascript.vm")
 
 		<script type="text/javascript" src="${context}/js/azkaban.ajax.utils.js"></script>
+		<script type="text/javascript" src="${context}/js/azkaban.logdata.model.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.jobdetails.view.js"></script>
 		<script type="text/javascript">
 			var contextURL = "${context}";
@@ -110,44 +111,56 @@
 	## Job Summary
 
     <div class="container-full" id="jobSummaryView">
-			<div class="row">
-				<div class="col-xs-12">
-					<table id="commandTable" class="table table-striped table-bordered table-hover">
-					</table>
-				
-					<div class="panel panel-default" id="jobsummary">
-						<div class="panel-heading">Job Summary</div>
-						<table class="table table-striped table-bordered table-hover">
-							<thead id="summaryHeader">
-							</thead>
-							<tbody id="summaryBody">
-							</tbody>
-						</table>
-					</div>
-				
-					<div class="panel panel-default" id="jobstats">
-						<div class="panel-heading">Job Stats</div>
-            <div class="panel-body panel-body-stats">
-              <table class="table table-striped table-bordered table-hover table-condensed">
-                <thead id="statsHeader">
+      <div class="row">
+        <div class="col-lg-12">
+          <div class="panel panel-default">
+            <div class="panel-heading">
+              <div class="pull-left">
+                <button type="button" id="updateSummaryBtn" class="btn btn-xs btn-default">Refresh</button>
+              </div>
+              &nbsp;
+            </div>
+
+            <div class="panel panel-default" id="commandSummary">
+              <div class="panel-heading">Command Summary</div>
+              <table id="commandTable" class="table table-striped table-bordered table-hover">
+              </table>
+            </div>
+          
+            <div class="panel panel-default" id="pigJobSummary">
+              <div class="panel-heading">Pig Job Summary</div>
+              <table class="table table-striped table-bordered table-hover">
+                <thead id="summaryHeader">
                 </thead>
-                <tbody id="statsBody">
+                <tbody id="summaryBody">
                 </tbody>
               </table>
             </div>
-					</div>
-					
-					<div class="panel panel-default" id="hiveTable">
-						<div class="panel-heading">Job Summary</div>
-						<table class="table table-striped table-bordered table-hover">
-							<thead id="hiveTableHeader">
-							</thead>
-							<tbody id="hiveTableBody">
-							</tbody>
-						</table>
-					</div>
-				</div>
-			</div>
+          
+            <div class="panel panel-default" id="pigJobStats">
+              <div class="panel-heading">Pig Job Stats</div>
+                <div class="panel-body panel-body-stats">
+                    <table class="table table-striped table-bordered table-hover table-condensed">
+                      <thead id="statsHeader">
+                      </thead>
+                      <tbody id="statsBody">
+                      </tbody>
+                    </table>
+                </div>
+            </div>
+
+            <div class="panel panel-default" id="hiveJobSummary">
+              <div class="panel-heading">Hive Job Summary</div>
+              <table class="table table-striped table-bordered table-hover"  id="hiveTable">
+                <thead id="hiveTableHeader">
+                </thead>
+                <tbody id="hiveTableBody">
+                </tbody>
+              </table>
+            </div>
+          </div>
+        </div>
+      </div>
     </div>
 			
 	## Error message message dialog.
diff --git a/src/package/webserver/bin/schedule2trigger.sh b/src/package/webserver/bin/schedule2trigger.sh
new file mode 100644
index 0000000..1178cf3
--- /dev/null
+++ b/src/package/webserver/bin/schedule2trigger.sh
@@ -0,0 +1,4 @@
+#!/bin/bash
+
+java -cp "lib/*:extlib/*" azkaban.migration.schedule2trigger.Schedule2Trigger conf/azkaban.properties
+
diff --git a/src/web/js/azkaban.jobdetails.view.js b/src/web/js/azkaban.jobdetails.view.js
index 3f2b7a2..f191f37 100644
--- a/src/web/js/azkaban.jobdetails.view.js
+++ b/src/web/js/azkaban.jobdetails.view.js
@@ -16,247 +16,184 @@
 
 $.namespace('azkaban');
 
-var logModel;
-azkaban.LogModel = Backbone.Model.extend({});
-
 var jobLogView;
 azkaban.JobLogView = Backbone.View.extend({
 	events: {
-		"click #updateLogBtn" : "handleUpdate"
-	},
-	initialize: function(settings) {
-		this.model.set({"offset": 0});
-		this.handleUpdate();
+		"click #updateLogBtn" : "refresh"
 	},
-	handleUpdate: function(evt) {
-		var requestURL = contextURL + "/executor"; 
-		var model = this.model;
-		var finished = false;
-
-		var date = new Date();
-		var startTime = date.getTime();
-		
-		while (!finished) {
-			var offset = this.model.get("offset");
-			var requestData = {
-				"execid": execId, 
-				"jobId": jobId, 
-				"ajax":"fetchExecJobLogs", 
-				"offset": offset, 
-				"length": 50000, 
-				"attempt": attempt
-			};
-
-			var successHandler = function(data) {
-				console.log("fetchLogs");
-				if (data.error) {
-					console.log(data.error);
-					finished = true;
-				}
-				else if (data.length == 0) {
-					finished = true;
-				}
-				else {
-					var date = new Date();
-					var endTime = date.getTime();
-					if ((endTime - startTime) > 10000) {
-						finished = true;
-						showDialog("Alert","The log is taking a long time to finish loading. Azkaban has stopped loading them. Please click Refresh to restart the load.");
-					} 
 
-					var re = /(https?:\/\/(([-\w\.]+)+(:\d+)?(\/([\w/_\.]*(\?\S+)?)?)?))/g;
-					var log = $("#logSection").text();
-					if (!log) {
-						log = data.data;
-					}
-					else {
-						log += data.data;
-					}
-
-					var newOffset = data.offset + data.length;
-					$("#logSection").text(log);
-					log = $("#logSection").html();
-					log = log.replace(re, "<a href=\"$1\" title=\"\">$1</a>");
-					$("#logSection").html(log);
+	initialize: function() {
+		this.listenTo(this.model, "change:logData", this.render);
+	},
 
-					model.set({"offset": newOffset, "log": log});
-					$(".logViewer").scrollTop(9999);
-				}
-			}
+	refresh: function() {
+		this.model.refresh();
+	},
 
-			$.ajax({
-				url: requestURL,
-				type: "get",
-				async: false,
-				data: requestData,
-				dataType: "json",
-				error: function(data) {
-					console.log(data);
-					finished = true;
-				},
-				success: successHandler
-			});
-		}
+	render: function() {
+		var re = /(https?:\/\/(([-\w\.]+)+(:\d+)?(\/([\w/_\.]*(\?\S+)?)?)?))/g;
+		var log = this.model.get("logData");
+		log = log.replace(re, "<a href=\"$1\" title=\"\">$1</a>");
+		$("#logSection").html(log);
 	}
 });
 
-var summaryModel;
-azkaban.SummaryModel = Backbone.Model.extend({});
-
 var jobSummaryView;
 azkaban.JobSummaryView = Backbone.View.extend({
 	events: {
-		"click #updateSummaryBtn" : "handleUpdate"
+		"click #updateSummaryBtn" : "refresh"
 	},
+
 	initialize: function(settings) {
-		this.handleUpdate();
+		$("#commandSummary").hide();
+		$("#pigJobSummary").hide();
+		$("#pigJobStats").hide();
+		$("#hiveJobSummary").hide();
+
+		this.listenTo(this.model, "change:commandProperties", this.renderCommandTable);
+		this.listenTo(this.model, "change:pigSummary", this.renderPigSummaryTable);
+		this.listenTo(this.model, "change:pigStats", this.renderPigStatsTable);
+		this.listenTo(this.model, "change:hiveSummary", this.renderHiveTable);
 	},
-	handleUpdate: function(evt) {
-		var requestURL = contextURL + "/executor"; 
-		var model = this.model;
-		var self = this;
 
-		var requestData = {
-			"execid": execId, 
-			"jobId": jobId, 
-			"ajax":"fetchExecJobSummary", 
-			"attempt": attempt
-		};
+	refresh: function() {
+		this.model.refresh();
+	},
 
-		$.ajax({
-			url: requestURL,
-			dataType: "json",
-			data: requestData,
-			error: function(data) {
-				console.log(data);
-			},
-			success: function(data) {
-				console.log("fetchSummary");
-				if (data.error) {
-					console.log(data.error);
-				}
-				else {
-					self.renderCommandTable(data.commandProperties);
-					self.renderJobTable(data.summaryTableHeaders, data.summaryTableData, "summary");
-					self.renderJobTable(data.statTableHeaders, data.statTableData, "stats");
-					self.renderHiveTable(data.hiveQueries, data.hiveQueryJobs);
-				}
-			}
-		});
+	handleUpdate: function(evt) {
+		renderJobTable(jobSummary.summaryTableHeaders, jobSummary.summaryTableData, "summary");
+		renderJobTable(jobSummary.statTableHeaders, jobSummary.statTableData, "stats");
+		renderHiveTable(jobSummary.hiveQueries, jobSummary.hiveQueryJobs);
 	},
-	renderCommandTable: function(commandProperties) {
-		if (commandProperties) {
-			var commandTable = $("#commandTable");
-			
-			for (var i = 0; i < commandProperties.length; i++) {
-				var prop = commandProperties[i];
+	renderCommandTable: function() {
+		var commandTable = $("#commandTable");
+		var commandProperties = this.model.get("commandProperties");
+
+		for (var key in commandProperties) {
+			if (commandProperties.hasOwnProperty(key)) {
+				var value = commandProperties[key];
+				if (Array.isArray(value)) {
+					value = value.join("<br/>");
+				}
 				var tr = document.createElement("tr");
-				var name = document.createElement("td");
-				var value = document.createElement("td");
-				$(name).html("<b>" + prop.first + "</b>");
-				$(value).html(prop.second);
-				$(tr).append(name);
-				$(tr).append(value);
+				var keyTd = document.createElement("td");
+				var valueTd = document.createElement("td");
+				$(keyTd).html("<b>" + key + "</b>");
+				$(valueTd).html(value);
+				$(tr).append(keyTd);
+				$(tr).append(valueTd);
 				commandTable.append(tr);
 			}
 		}
+
+		$("#commandSummary").show();
 	},
-	renderJobTable: function(headers, data, prefix) {
-		if (headers) {
-			// Add table headers
-			var header = $("#" + prefix + "Header");
-			var tr = document.createElement("tr");
-			var i;
-			for (i = 0; i < headers.length; i++) {
-				var th = document.createElement("th");
-				$(th).text(headers[i]);
-				$(tr).append(th);
-			}
-			header.append(tr);
-			
-			// Add table body
-			var body = $("#" + prefix + "Body");
-			for (i = 0; i < data.length; i++) {
-				tr = document.createElement("tr");
-				var row = data[i];
-				for (var j = 0; j < row.length; j++) {
-					var td = document.createElement("td");
-					if (j == 0) {
-						// first column is a link to job details page 
-						$(td).html(row[j]);
-					} else {
-						$(td).text(row[j]);
-					}
-					$(tr).append(td);
+	renderPigTable: function(tableName, data) {
+		// Add table headers
+		var header = $("#" + tableName + "Header");
+		var tr = document.createElement("tr");
+		var i;
+		var headers = data[0];
+		var numColumns = headers.length;
+		for (i = 0; i < numColumns; i++) {
+			var th = document.createElement("th");
+			$(th).text(headers[i]);
+			$(tr).append(th);
+		}
+		header.append(tr);
+		
+		// Add table body
+		var body = $("#" + tableName + "Body");
+		for (i = 1; i < data.length; i++) {
+			tr = document.createElement("tr");
+			var row = data[i];
+			for (var j = 0; j < numColumns; j++) {
+				var td = document.createElement("td");
+				if (j == 0) {
+					// first column is a link to job details page 
+					$(td).html(row[j]);
+				} else {
+					$(td).text(row[j]);
 				}
-				body.append(tr);
+				$(tr).append(td);
 			}
-		} else {
-			$("#job" + prefix).hide();
+			body.append(tr);
 		}
+
+		$("#pigJob" + tableName.charAt(0).toUpperCase() + tableName.substring(1)).show();
 	},
-	renderHiveTable: function(queries, queryJobs) {
-		if (queries) {
-			// Set up table column headers
-			var header = $("#hiveTableHeader");
-			var tr = document.createElement("tr");
-			var headers = ["Query","Job","Map","Reduce","HDFS Read","HDFS Write"];
-			var i;
-			
-			for (i = 0; i < headers.length; i++) {
-				var th = document.createElement("th");
-				$(th).text(headers[i]);
-				$(tr).append(th);
-			}
-			header.append(tr);
+	renderPigSummaryTable: function() {
+		this.renderPigTable("summary", this.model.get("pigSummary"));
+	},
+	renderPigStatsTable: function() {
+		this.renderPigTable("stats", this.model.get("pigStats"));
+	},
+	renderHiveTable: function() {
+		var hiveSummary = this.model.get("hiveSummary");
+		var queries = hiveSummary.hiveQueries;
+		var queryJobs = hiveSummary.hiveQueryJobs;
+
+		// Set up table column headers
+		var header = $("#hiveTableHeader");
+		var tr = document.createElement("tr");
+		var headers = ["Query","Job","Map","Reduce","HDFS Read","HDFS Write"];
+		var i;
+		
+		for (i = 0; i < headers.length; i++) {
+			var th = document.createElement("th");
+			$(th).text(headers[i]);
+			$(tr).append(th);
+		}
+		header.html(tr);
+		
+		// Construct table body
+		var oldBody = $("#hiveTableBody");
+		var newBody = $(document.createElement("tbody")).attr("id", "hiveTableBody");
+		for (i = 0; i < queries.length; i++) {
+			// new query
+			tr = document.createElement("tr");
+			var td = document.createElement("td");
+			$(td).html("<b>" + queries[i] + "</b>");
+			$(tr).append(td);
 			
-			// Construct table body
-			var body = $("#hiveTableBody");
-			for (i = 0; i < queries.length; i++) {
-				// new query
-				tr = document.createElement("tr");
-				var td = document.createElement("td");
-				$(td).html("<b>" + queries[i] + "</b>");
-				$(tr).append(td);
+			var jobs = queryJobs[i];
+			if (jobs != null) {
+				// add first job for this query
+				var jobValues = jobs[0];
+				var j;
+				for (j = 0; j < jobValues.length; j++) {
+					td = document.createElement("td");
+					$(td).html(jobValues[j]);
+					$(tr).append(td);
+				}
+				newBody.append(tr);
 				
-				var jobs = queryJobs[i];
-				if (jobs != null) {
-					// add first job for this query
-					var jobValues = jobs[0];
-					var j;
-					for (j = 0; j < jobValues.length; j++) {
-						td = document.createElement("td");
-						$(td).html(jobValues[j]);
-						$(tr).append(td);
-					}
-					body.append(tr);
+				// add remaining jobs for this query
+				for (j = 1; j < jobs.length; j++) {
+					jobValues = jobs[j];
+					tr = document.createElement("tr");
+					
+					// add empty cell for query column
+					td = document.createElement("td");
+					$(td).html("&nbsp;");
+					$(tr).append(td);
 					
-					// add remaining jobs for this query
-					for (j = 1; j < jobs.length; j++) {
-						jobValues = jobs[j];
-						tr = document.createElement("tr");
-						
-						// add empty cell for query column
+					// add job values
+					for (var k = 0; k < jobValues.length; k++) {
 						td = document.createElement("td");
-						$(td).html("&nbsp;");
+						$(td).html(jobValues[k]);
 						$(tr).append(td);
-						
-						// add job values
-						for (var k = 0; k < jobValues.length; k++) {
-							td = document.createElement("td");
-							$(td).html(jobValues[k]);
-							$(tr).append(td);
-						}
-						body.append(tr);
 					}
-					
-				} else {
-					body.append(tr);
+					newBody.append(tr);
 				}
+				
+			} else {
+				newBody.append(tr);
 			}
-		} else {
-			$("#hiveTable").hide();
 		}
+		oldBody.replaceWith(newBody);
+
+		$("#hiveJobSummary").show();
 	}
 });
 
@@ -277,9 +214,6 @@ azkaban.JobTabView = Backbone.View.extend({
 		}
 	},
 
-	render: function() {
-	},
-
 	handleJobLogViewLinkClick: function() {
 		$('#jobSummaryViewLink').removeClass('active');
 		$('#jobSummaryView').hide();
@@ -312,23 +246,24 @@ var showDialog = function(title, message) {
 }
 
 $(function() {
-	var selected;
-	logModel = new azkaban.LogModel();
+	var logDataModel = new azkaban.LogDataModel();
+	
 	jobLogView = new azkaban.JobLogView({
 		el: $('#jobLogView'), 
-		model: logModel
+		model: logDataModel
 	});
 
-	summaryModel = new azkaban.SummaryModel();
 	jobSummaryView = new azkaban.JobSummaryView({
 		el: $('#jobSummaryView'), 
-		model: summaryModel
+		model: logDataModel
 	});
 
 	jobTabView = new azkaban.JobTabView({
 		el: $('#headertabs')
 	});
 
+	logDataModel.refresh();
+
 	if (window.location.hash) {
 		var hash = window.location.hash;
 		if (hash == '#joblog') {
diff --git a/src/web/js/azkaban.logdata.model.js b/src/web/js/azkaban.logdata.model.js
new file mode 100644
index 0000000..2e21386
--- /dev/null
+++ b/src/web/js/azkaban.logdata.model.js
@@ -0,0 +1,302 @@
+$.namespace('azkaban');
+
+azkaban.LogDataModel = Backbone.Model.extend({
+    TIMESTAMP_REGEX: /^.*? - /gm,
+
+    JOB_TRACKER_URL_REGEX: /https?:\/\/[-\w\.]+(?::\d+)?\/[\w\/\.]*\?\S+(job_\d{12}_\d{4,})\S*/,
+
+    // Command properties
+    COMMAND_START: "Command: ",
+    CLASSPATH_REGEX: /(?:-cp|-classpath)\s+(\S+)/g,
+    ENVIRONMENT_VARIABLES_REGEX: /-D(\S+)/g,
+    JVM_MEMORY_REGEX: /(-Xm\S+)/g,
+    PIG_PARAMS_REGEX: /-param\s+(\S+)/g,
+
+    JOB_TYPE_REGEX: /Building (\S+) job executor/,
+
+    PIG_JOB_SUMMARY_START: "HadoopVersion",
+    PIG_JOB_STATS_START: "Job Stats (time in seconds):",
+
+    HIVE_PARSING_START: "Parsing command: ",
+    HIVE_PARSING_END: "Parse Completed",
+    HIVE_NUM_MAP_REDUCE_JOBS_STRING: "Total MapReduce jobs = ",
+    HIVE_MAP_REDUCE_JOB_START: "Starting Job",
+    HIVE_MAP_REDUCE_JOBS_SUMMARY: "MapReduce Jobs Launched:",
+    HIVE_MAP_REDUCE_SUMMARY_REGEX: /Job (\d+): Map: (\d+)  Reduce: (\d+)   HDFS Read: (\d+) HDFS Write: (\d+)/,
+
+    initialize: function() {
+        this.set("offset", 0 );
+        this.set("logData", "");
+        this.on("change:logData", this.parseLogData);
+    },
+
+    refresh: function() {
+        var requestURL = contextURL + "/executor"; 
+        var finished = false;
+
+        var date = new Date();
+        var startTime = date.getTime();
+        
+        while (!finished) {
+            var requestData = {
+                "execid": execId,
+                "jobId": jobId,
+                "ajax":"fetchExecJobLogs",
+                "offset": this.get("offset"),
+                "length": 50000,
+                "attempt": attempt
+            };
+
+            var self = this;
+
+            var successHandler = function(data) {
+                console.log("fetchLogs");
+                if (data.error) {
+                    console.log(data.error);
+                    finished = true;
+                }
+                else if (data.length == 0) {
+                    finished = true;
+                }
+                else {
+                    var date = new Date();
+                    var endTime = date.getTime();
+                    if ((endTime - startTime) > 10000) {
+                        finished = true;
+                        showDialog("Alert","The log is taking a long time to finish loading. Azkaban has stopped loading them. Please click Refresh to restart the load.");
+                    }
+
+                    self.set("offset", data.offset + data.length);
+                    self.set("logData", self.get("logData") + data.data);
+                }
+            }
+
+            $.ajax({
+                url: requestURL,
+                type: "get",
+                async: false,
+                data: requestData,
+                dataType: "json",
+                error: function(data) {
+                    console.log(data);
+                    finished = true;
+                },
+                success: successHandler
+            });
+        }
+    },
+
+    parseLogData: function() {
+        var data = this.get("logData").replace(this.TIMESTAMP_REGEX, "");
+        var lines = data.split("\n");
+
+        if (this.parseCommand(lines)) {
+            this.parseJobTrackerUrls(lines);
+
+            var jobType = this.parseJobType(lines);
+            if (jobType.indexOf("pig") !== -1) {
+                this.parsePigTable(lines, "pigSummary", this.PIG_JOB_SUMMARY_START, "", 0);
+                this.parsePigTable(lines, "pigStats", this.PIG_JOB_STATS_START, "", 1);
+            } else if (jobType.indexOf("hive") !== -1) {
+                this.parseHiveQueries(lines);
+            }
+        }
+    },
+
+    parseCommand: function(lines) {
+        var commandStartIndex = -1;
+        var numLines = lines.length;
+        for (var i = 0; i < numLines; i++) {
+            if (lines[i].indexOf(this.COMMAND_START) === 0) {
+                commandStartIndex = i;
+                break;
+            }
+        }
+        
+        if (commandStartIndex != -1) {
+            var commandProperties = {};
+
+            var command = lines[commandStartIndex].substring(this.COMMAND_START.length);
+            commandProperties.Command = command;
+            
+            this.parseCommandProperty(command, commandProperties, "Classpath", this.CLASSPATH_REGEX, ':');
+            this.parseCommandProperty(command, commandProperties, "-D", this.ENVIRONMENT_VARIABLES_REGEX);
+            this.parseCommandProperty(command, commandProperties, "Memory Settings", this.JVM_MEMORY_REGEX);
+            this.parseCommandProperty(command, commandProperties, "Params", this.PIG_PARAMS_REGEX);
+            
+            this.set("commandProperties", commandProperties);
+
+            return true;
+        }
+        
+        return false;
+    },
+
+    parseCommandProperty: function(command, commandProperties, propertyName, regex, split) {
+        var results = [];
+        var match;
+        while (match = regex.exec(command)) {
+            if (split) {
+                results = results.concat(match[1].split(split));
+            } else {
+                results.push(match[1]);
+            }
+        }
+
+        if (results.length > 0) {
+            commandProperties[propertyName] = results;
+        }
+    },
+
+    parseJobTrackerUrls: function(lines) {
+        var jobTrackerUrls = {};
+        var jobTrackerUrlsOrdered = [];
+        var numLines = lines.length;
+        var match;
+        for (var i = 0; i < numLines; i++) {
+            if ((match = this.JOB_TRACKER_URL_REGEX.exec(lines[i])) && !jobTrackerUrls[match[1]]) {
+                jobTrackerUrls[match[1]] = match[0];
+                jobTrackerUrlsOrdered.push(match[0]);
+            }
+        }
+        this.set("jobTrackerUrls", jobTrackerUrls);
+        this.set("jobTrackerUrlsOrdered", jobTrackerUrlsOrdered);
+    },
+
+    parseJobType: function(lines) {
+        var numLines = lines.length;
+        var match;
+        for (var i = 0; numLines; i++) {
+            if (match = this.JOB_TYPE_REGEX.exec(lines[i])) {
+                return match[1];
+            }
+        }
+        
+        return null;
+    },
+
+    parsePigTable: function(lines, tableName, startPattern, endPattern, linesToSkipAfterStart) {
+        var index = -1;
+        var numLines = lines.length;
+        for (var i = 0; i < numLines; i++) {
+            if (lines[i].indexOf(startPattern) === 0) {
+                index = i + linesToSkipAfterStart;
+                break;
+            }
+        }
+        
+        if (index != -1) {
+            var table = [];
+            var line;
+            while ((line = lines[index]) !== endPattern) {
+                var columns = line.split("\t");
+                // If first column is a job id, make it a link to the job tracker.
+                if (this.get("jobTrackerUrls")[columns[0]]) {
+                    columns[0] = "<a href='" + this.get("jobTrackerUrls")[columns[0]] + "'>" + columns[0] + "</a>";
+                }
+                table.push(columns);
+                index++;
+            }
+
+            this.set(tableName, table);
+        }
+    },
+
+    parseHiveQueries: function(lines) {
+        var hiveQueries = [];
+        var hiveQueryJobs = [];
+
+        var currMapReduceJob = 0;
+        var numLines = lines.length;
+        for (var i = 0; i < numLines;) {
+            var line = lines[i];
+            var parsingCommandIndex = line.indexOf(this.HIVE_PARSING_START);
+            if (parsingCommandIndex === -1) {
+                i++;
+                continue;
+            }
+
+            // parse query text, which could span multiple lines
+            var queryStartIndex = parsingCommandIndex + this.HIVE_PARSING_START.length;
+            var query = line.substring(queryStartIndex) + "<br/>";
+            
+            i++;
+            while (i < numLines && (line = lines[i]).indexOf(this.HIVE_PARSING_END) === -1) {
+                query += line + "<br/>";
+                i++;
+            }
+            hiveQueries.push(query);
+            i++;
+            
+            // parse the query's Map-Reduce jobs, if any.
+            var numMRJobs = 0;
+            while (i < numLines) {
+                line = lines[i];
+                if (line.contains(this.HIVE_NUM_MAP_REDUCE_JOBS_STRING)) {
+                    // query involves map reduce jobs
+                    var numMRJobs = parseInt(line.substring(this.HIVE_NUM_MAP_REDUCE_JOBS_STRING.length),10);
+                    i++;
+                    
+                    // get the map reduce jobs summary
+                    while (i < numLines) {
+                        line = lines[i];
+                        if (line.contains(this.HIVE_MAP_REDUCE_JOBS_SUMMARY)) {
+                            // job summary table found
+                            i++;
+                            
+                            var queryJobs = [];
+                            
+                            var previousJob = -1;
+                            var numJobsSeen = 0;
+                            while (numJobsSeen < numMRJobs && i < numLines) {
+                                line = lines[i];
+                                var match;
+                                if (match = this.HIVE_MAP_REDUCE_SUMMARY_REGEX.exec(line)) {
+                                    var currJob = parseInt(match[1], 10);
+                                    if (currJob === previousJob) {
+                                        i++;
+                                        continue;
+                                    }
+                                    
+                                    var job = [];
+                                    job.push("<a href='" + this.get("jobTrackerUrlsOrdered")[currMapReduceJob++] + "'>" + currJob + "</a>");
+                                    job.push(match[2]);
+                                    job.push(match[3]);
+                                    job.push(match[4]);
+                                    job.push(match[5]);
+                                    queryJobs.push(job);
+                                    previousJob = currJob;
+                                    numJobsSeen++;
+                                }
+                                i++;
+                            }
+                            
+                            if (numJobsSeen === numMRJobs) {
+                                hiveQueryJobs.push(queryJobs);
+                            }
+                            
+                            break;
+                        }
+                        i++;
+                    } 
+                    break;
+                }
+                else if (line.contains(this.HIVE_PARSING_START)) {
+                    if (numMRJobs === 0) {
+                        hiveQueryJobs.push(null);
+                    }
+                    break;
+                }
+                i++;
+            }
+            continue;
+        }
+
+        if (hiveQueries.length > 0) {
+            this.set("hiveSummary", {
+                hiveQueries: hiveQueries,
+                hiveQueryJobs: hiveQueryJobs
+            });
+        }
+    }
+});