azkaban-memoizeit

Merge pull request #15 from lishd/master Added schedule_id

5/15/2013 9:07:01 PM

Details

diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 883a19f..b449654 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -34,6 +34,7 @@ import azkaban.utils.JSONUtils;
 public class ExecutableFlow {
 	private int executionId = -1;
 	private String flowId;
+	private int scheduleId = -1;
 	private int projectId;
 	private int version;
 
@@ -57,6 +58,7 @@ public class ExecutableFlow {
 	
 	public ExecutableFlow(Flow flow) {
 		this.projectId = flow.getProjectId();
+		this.scheduleId = -1;
 		this.flowId = flow.getId();
 		this.version = flow.getVersion();
 		this.setFlow(flow);
@@ -64,6 +66,7 @@ public class ExecutableFlow {
 	
 	public ExecutableFlow(int executionId, Flow flow) {
 		this.projectId = flow.getProjectId();
+		this.scheduleId = -1;
 		this.flowId = flow.getId();
 		this.version = flow.getVersion();
 		this.executionId = executionId;
@@ -208,6 +211,14 @@ public class ExecutableFlow {
 		this.projectId = projectId;
 	}
 
+	public int getScheduleId() {
+		return scheduleId;
+	}
+
+	public void setScheduleId(int scheduleId) {
+		this.scheduleId = scheduleId;
+	}
+
 	public String getExecutionPath() {
 		return executionPath;
 	}
@@ -255,6 +266,9 @@ public class ExecutableFlow {
 		flowObj.put("executionPath", executionPath);
 		flowObj.put("flowId", flowId);
 		flowObj.put("projectId", projectId);
+		if(scheduleId >= 0) {
+			flowObj.put("scheduleId", scheduleId);
+		}
 		flowObj.put("submitTime", submitTime);
 		flowObj.put("startTime", startTime);
 		flowObj.put("endTime", endTime);
@@ -370,6 +384,9 @@ public class ExecutableFlow {
 		exFlow.executionPath = (String)flowObj.get("executionPath");
 		exFlow.flowId = (String)flowObj.get("flowId");
 		exFlow.projectId = (Integer)flowObj.get("projectId");
+		if (flowObj.containsKey("scheduleId")) {
+			exFlow.scheduleId = (Integer)flowObj.get("scheduleId");
+		}
 		exFlow.submitTime = JSONUtils.getLongFromObject(flowObj.get("submitTime"));
 		exFlow.startTime = JSONUtils.getLongFromObject(flowObj.get("startTime"));
 		exFlow.endTime = JSONUtils.getLongFromObject(flowObj.get("endTime"));
diff --git a/src/java/azkaban/scheduler/JdbcScheduleLoader.java b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
index 4b70fbc..8118d08 100644
--- a/src/java/azkaban/scheduler/JdbcScheduleLoader.java
+++ b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
@@ -46,19 +46,19 @@ public class JdbcScheduleLoader extends AbstractJdbcLoader implements ScheduleLo
 	private static final String scheduleTableName = "schedules";
 
 	private static String SELECT_ALL_SCHEDULES =
-			"SELECT project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user, enc_type, schedule_options FROM " + scheduleTableName;
+			"SELECT schedule_id, project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user, enc_type, schedule_options FROM " + scheduleTableName;
 	
 	private static String INSERT_SCHEDULE = 
 			"INSERT INTO " + scheduleTableName + " ( project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user, enc_type, schedule_options) values (?,?,?,?,?,?,?,?,?,?,?,?,?)";
 	
 	private static String REMOVE_SCHEDULE_BY_KEY = 
-			"DELETE FROM " + scheduleTableName + " WHERE project_id=? AND flow_name=?";
+			"DELETE FROM " + scheduleTableName + " WHERE schedule_id=?";
 	
 	private static String UPDATE_SCHEDULE_BY_KEY = 
-			"UPDATE " + scheduleTableName + " SET status=?, first_sched_time=?, timezone=?, period=?, last_modify_time=?, next_exec_time=?, submit_time=?, submit_user=?, enc_type=?, schedule_options=? WHERE project_id=? AND flow_name=?";
+			"UPDATE " + scheduleTableName + " SET status=?, first_sched_time=?, timezone=?, period=?, last_modify_time=?, next_exec_time=?, submit_time=?, submit_user=?, enc_type=?, schedule_options=? WHERE schedule_id=?";
 	
 	private static String UPDATE_NEXT_EXEC_TIME = 
-			"UPDATE " + scheduleTableName + " SET next_exec_time=? WHERE project_id=? AND flow_name=?";
+			"UPDATE " + scheduleTableName + " SET next_exec_time=? WHERE schedule_id=?";
 
 	public EncodingType getDefaultEncodingType() {
 		return defaultEncodingType;
@@ -127,7 +127,7 @@ public class JdbcScheduleLoader extends AbstractJdbcLoader implements ScheduleLo
 
 		QueryRunner runner = createQueryRunner();
 		try {
-			int removes =  runner.update(REMOVE_SCHEDULE_BY_KEY, s.getProjectId(), s.getFlowName());
+			int removes =  runner.update(REMOVE_SCHEDULE_BY_KEY, s.getScheduleId());
 			if (removes == 0) {
 				throw new ScheduleManagerException("No schedule has been removed.");
 			}
@@ -177,6 +177,15 @@ public class JdbcScheduleLoader extends AbstractJdbcLoader implements ScheduleLo
 					s.getSubmitUser(),
 					encType.getNumVal(),
 					data);
+			
+			long id = runner.query(LastInsertID.LAST_INSERT_ID, new LastInsertID());
+
+			if (id == -1l) {
+				throw new ScheduleManagerException("Execution id is not properly created.");
+			}
+			logger.info("Schedule given " + s.getScheduleIdentityPair() + " given id " + id);
+			s.setScheduleId((int)id);
+			
 			if (inserts == 0) {
 				throw new ScheduleManagerException("No schedule has been inserted.");
 			}
@@ -194,7 +203,7 @@ public class JdbcScheduleLoader extends AbstractJdbcLoader implements ScheduleLo
 		QueryRunner runner = new QueryRunner();
 		try {
 			
-			runner.update(connection, UPDATE_NEXT_EXEC_TIME, s.getNextExecTime(), s.getProjectId(), s.getFlowName()); 
+			runner.update(connection, UPDATE_NEXT_EXEC_TIME, s.getNextExecTime(), s.getScheduleId()); 
 		} catch (SQLException e) {
 			e.printStackTrace();
 			logger.error(UPDATE_NEXT_EXEC_TIME + " failed.", e);
@@ -242,8 +251,7 @@ public class JdbcScheduleLoader extends AbstractJdbcLoader implements ScheduleLo
 					s.getSubmitUser(), 	
 					encType.getNumVal(),
 					data,
-					s.getProjectId(), 
-					s.getFlowName());
+					s.getScheduleId());
 			if (updates == 0) {
 				throw new ScheduleManagerException("No schedule has been updated.");
 			}
@@ -253,6 +261,22 @@ public class JdbcScheduleLoader extends AbstractJdbcLoader implements ScheduleLo
 		}
 	}
 
+	
+	private static class LastInsertID implements ResultSetHandler<Long> {
+		private static String LAST_INSERT_ID = "SELECT LAST_INSERT_ID()";
+		
+		@Override
+		public Long handle(ResultSet rs) throws SQLException {
+			if (!rs.next()) {
+				return -1l;
+			}
+
+			long id = rs.getLong(1);
+			return id;
+		}
+		
+	}
+	
 	public class ScheduleResultHandler implements ResultSetHandler<List<Schedule>> {
 		@Override
 		public List<Schedule> handle(ResultSet rs) throws SQLException {
@@ -262,19 +286,20 @@ public class JdbcScheduleLoader extends AbstractJdbcLoader implements ScheduleLo
 			
 			ArrayList<Schedule> schedules = new ArrayList<Schedule>();
 			do {
-				int projectId = rs.getInt(1);
-				String projectName = rs.getString(2);
-				String flowName = rs.getString(3);
-				String status = rs.getString(4);
-				long firstSchedTime = rs.getLong(5);
-				DateTimeZone timezone = DateTimeZone.forID(rs.getString(6));
-				ReadablePeriod period = Schedule.parsePeriodString(rs.getString(7));
-				long lastModifyTime = rs.getLong(8);
-				long nextExecTime = rs.getLong(9);
-				long submitTime = rs.getLong(10);
-				String submitUser = rs.getString(11);
-				int encodingType = rs.getInt(12);
-				byte[] data = rs.getBytes(13);
+				int scheduleId = rs.getInt(1);
+				int projectId = rs.getInt(2);
+				String projectName = rs.getString(3);
+				String flowName = rs.getString(4);
+				String status = rs.getString(5);
+				long firstSchedTime = rs.getLong(6);
+				DateTimeZone timezone = DateTimeZone.forID(rs.getString(7));
+				ReadablePeriod period = Schedule.parsePeriodString(rs.getString(8));
+				long lastModifyTime = rs.getLong(9);
+				long nextExecTime = rs.getLong(10);
+				long submitTime = rs.getLong(11);
+				String submitUser = rs.getString(12);
+				int encodingType = rs.getInt(13);
+				byte[] data = rs.getBytes(14);
 				
 				Object optsObj = null;
 				if (data != null) {
@@ -296,7 +321,7 @@ public class JdbcScheduleLoader extends AbstractJdbcLoader implements ScheduleLo
 					}
 				}
 				
-				Schedule s = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser);
+				Schedule s = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser);
 				if (optsObj != null) {
 					s.createAndSetScheduleOptions(optsObj);
 				}
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index b27cc96..f11c0fe 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -40,7 +40,8 @@ public class Schedule{
 //	private long flowGuid;
 	
 //	private String scheduleId;
-	
+
+	private int scheduleId;
 	private int projectId;
 	private String projectName;
 	private String flowName;
@@ -57,6 +58,7 @@ public class Schedule{
 	private SlaOptions slaOptions;
 	
 	public Schedule(
+						int scheduleId,
 						int projectId,
 						String projectName,
 						String flowName,
@@ -69,22 +71,26 @@ public class Schedule{
 						long submitTime,
 						String submitUser
 						) {
-		this.projectId = projectId;
-		this.projectName = projectName;
-		this.flowName = flowName;
-		this.firstSchedTime = firstSchedTime;
-		this.timezone = timezone;
-		this.lastModifyTime = lastModifyTime;
-		this.period = period;
-		this.nextExecTime = nextExecTime;
-		this.submitUser = submitUser;
-		this.status = status;
-		this.submitTime = submitTime;
-		this.executionOptions = null;
-		this.slaOptions = null;
+
+		this(scheduleId, 
+				projectId, 
+				projectName, 
+				flowName, 
+				status,
+				firstSchedTime, 
+				timezone,
+				period,
+				lastModifyTime,
+				nextExecTime,
+				submitTime,
+				submitUser,
+				null,
+				null
+				);
 	}
 
 	public Schedule(
+						int scheduleId,
 						int projectId,
 						String projectName,
 						String flowName,
@@ -99,22 +105,24 @@ public class Schedule{
 						ExecutionOptions executionOptions,
 						SlaOptions slaOptions
 			) {
-		this.projectId = projectId;
-		this.projectName = projectName;
-		this.flowName = flowName;
-		this.firstSchedTime = firstSchedTime;
-		this.timezone = DateTimeZone.forID(timezoneId);
-		this.lastModifyTime = lastModifyTime;
-		this.period = parsePeriodString(period);
-		this.nextExecTime = nextExecTime;
-		this.submitUser = submitUser;
-		this.status = status;
-		this.submitTime = submitTime;
-		this.executionOptions = executionOptions;
-		this.slaOptions = slaOptions;
+		this(scheduleId, projectId, 
+				projectName, 
+				flowName, 
+				status,
+				firstSchedTime, 
+				DateTimeZone.forID(timezoneId),
+				parsePeriodString(period),
+				lastModifyTime,
+				nextExecTime,
+				submitTime,
+				submitUser,
+				executionOptions,
+				slaOptions
+				);
 	}
 
 	public Schedule(
+						int scheduleId,
 						int projectId,
 						String projectName,
 						String flowName,
@@ -129,6 +137,7 @@ public class Schedule{
 						ExecutionOptions executionOptions,
 						SlaOptions slaOptions
 						) {
+		this.scheduleId = scheduleId;
 		this.projectId = projectId;
 		this.projectName = projectName;
 		this.flowName = flowName;
@@ -169,9 +178,17 @@ public class Schedule{
 				new DateTime(firstSchedTime).toDateTimeISO() + " with recurring period of " + (period == null ? "non-recurring" : createPeriodString(period));
 	}
 	
-	public Pair<Integer, String> getScheduleId() {
+	public Pair<Integer, String> getScheduleIdentityPair() {
 		return new Pair<Integer, String>(getProjectId(), getFlowName());
 	}
+
+	public void setScheduleId(int scheduleId) {
+		this.scheduleId = scheduleId;
+	}
+	
+	public int getScheduleId() {
+		return scheduleId;
+	}
 	
 	public int getProjectId() {
 		return projectId;
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 5688f66..de90a5c 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -19,9 +19,11 @@ package azkaban.scheduler;
 import java.lang.Thread.State;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -36,15 +38,13 @@ import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerException;
-
 import azkaban.flow.Flow;
 import azkaban.project.Project;
 import azkaban.project.ProjectManager;
-
 import azkaban.sla.SLA.SlaAction;
 import azkaban.sla.SLA.SlaRule;
-import azkaban.sla.SLAManager;
 import azkaban.sla.SLA.SlaSetting;
+import azkaban.sla.SLAManager;
 import azkaban.sla.SlaOptions;
 import azkaban.utils.Pair;
 
@@ -60,7 +60,8 @@ public class ScheduleManager {
 	private final DateTimeFormatter _dateFormat = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss:SSS");
 	private ScheduleLoader loader;
 
-	private Map<Pair<Integer, String>, Schedule> scheduleIDMap = new LinkedHashMap<Pair<Integer, String>, Schedule>();
+	private Map<Pair<Integer, String>, Set<Schedule>> scheduleIdentityPairMap = new LinkedHashMap<Pair<Integer, String>, Set<Schedule>>();
+	private Map<Integer, Schedule> scheduleIDMap = new LinkedHashMap<Integer, Schedule>();
 	private final ScheduleRunner runner;
 	private final ExecutorManager executorManager;
 	private final ProjectManager projectManager;
@@ -126,20 +127,48 @@ public class ScheduleManager {
 	 * @param id
 	 * @return
 	*/
-	public Schedule getSchedule(int projectId, String flowId) {
-		return scheduleIDMap.get(new Pair<Integer,String>(projectId, flowId));
+	public Set<Schedule> getSchedules(int projectId, String flowId) {
+		return scheduleIdentityPairMap.get(new Pair<Integer,String>(projectId, flowId));
+	}
+
+	/**
+	 * Returns the scheduled flow for the scheduleId
+	 * 
+	 * @param id
+	 * @return
+	*/
+	public Schedule getSchedule(int scheduleId) {
+		return scheduleIDMap.get(scheduleId);
 	}
 
+
 	/**
 	 * Removes the flow from the schedule if it exists.
 	 * 
 	 * @param id
 	 */
-	public synchronized void removeSchedule(int projectId, String flowId) {
-		Pair<Integer,String> scheduleId = new Pair<Integer,String>(projectId, flowId);
-		
-		Schedule sched = scheduleIDMap.get(scheduleId);
-		scheduleIDMap.remove(scheduleId);
+	public synchronized void removeSchedules(int projectId, String flowId) {
+		Set<Schedule> schedules = getSchedules(projectId, flowId);
+		for(Schedule sched : schedules) {
+			removeSchedule(sched);
+		}
+	}
+	/**
+	 * Removes the flow from the schedule if it exists.
+	 * 
+	 * @param id
+	 */
+	public synchronized void removeSchedule(Schedule sched) {
+
+		Pair<Integer,String> identityPairMap = new Pair<Integer,String>(sched.getProjectId(), sched.getFlowName());
+		Set<Schedule> schedules = scheduleIdentityPairMap.get(identityPairMap);
+		if(schedules != null) {
+			schedules.remove(sched);
+			if(schedules.size() == 0) {
+				scheduleIdentityPairMap.remove(identityPairMap);
+			}
+		}
+		scheduleIDMap.remove(sched.getScheduleId());
 		
 		runner.removeRunnerSchedule(sched);
 		try {
@@ -173,6 +202,7 @@ public class ScheduleManager {
 	// }
 
 	public Schedule scheduleFlow(
+			final int scheduleId,
 			final int projectId,
 			final String projectName,
 			final String flowName,
@@ -185,10 +215,11 @@ public class ScheduleManager {
 			final long submitTime,
 			final String submitUser
 			) {
-		return scheduleFlow(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null, null);
+		return scheduleFlow(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null, null);
 	}
 	
 	public Schedule scheduleFlow(
+			final int scheduleId,
 			final int projectId,
 			final String projectName,
 			final String flowName,
@@ -203,7 +234,7 @@ public class ScheduleManager {
 			ExecutionOptions execOptions,
 			SlaOptions slaOptions
 			) {
-		Schedule sched = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions, slaOptions);
+		Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions, slaOptions);
 		logger.info("Scheduling flow '" + sched.getScheduleName() + "' for "
 				+ _dateFormat.print(firstSchedTime) + " with a period of "
 				+ period == null ? "(non-recurring)" : period);
@@ -225,6 +256,12 @@ public class ScheduleManager {
 		s.updateTime();
 		this.runner.addRunnerSchedule(s);
 		scheduleIDMap.put(s.getScheduleId(), s);
+		Set<Schedule> schedules = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
+		if(schedules == null) {
+			schedules = new HashSet<Schedule>();
+			scheduleIdentityPairMap.put(s.getScheduleIdentityPair(), schedules);
+		}
+		schedules.add(s);
 	}
 
 	/**
@@ -233,7 +270,7 @@ public class ScheduleManager {
 	 * @param flow
 	 */
 	public synchronized void insertSchedule(Schedule s) {
-		boolean exist = scheduleIDMap.containsKey(s.getScheduleId());
+		boolean exist = scheduleIdentityPairMap.containsKey(s.getScheduleIdentityPair());
 		if(s.updateTime()) {
 			internalSchedule(s);
 			try {
@@ -370,6 +407,7 @@ public class ScheduleManager {
 
 									// Create ExecutableFlow
 									ExecutableFlow exflow = new ExecutableFlow(flow);
+									exflow.setScheduleId(runningSched.getScheduleId());
 									exflow.setSubmitUser(runningSched.getSubmitUser());
 									exflow.addAllProxyUsers(project.getProxyUsers());
 									
@@ -441,7 +479,7 @@ public class ScheduleManager {
 									loader.updateSchedule(runningSched);
 								}
 								else {
-									removeSchedule(runningSched.getProjectId(), runningSched.getFlowName());
+									removeSchedule(runningSched);
 								}								
 							} else {
 								// wait until flow run
diff --git a/src/java/azkaban/utils/db/h2/schedules.sql b/src/java/azkaban/utils/db/h2/schedules.sql
index 1924ecd..b759188 100644
--- a/src/java/azkaban/utils/db/h2/schedules.sql
+++ b/src/java/azkaban/utils/db/h2/schedules.sql
@@ -1,4 +1,5 @@
 CREATE TABLE schedules (
+	schedule_id INT NOT NULL AUTO_INCREMENT,
 	project_id INT NOT NULL,
 	project_name VARCHAR(128) NOT NULL,
 	flow_name VARCHAR(128) NOT NULL,
@@ -12,5 +13,7 @@ CREATE TABLE schedules (
 	submit_user VARCHAR(128),
 	enc_type TINYINT,
 	schedule_options LONGBLOB,
-	primary key(project_id, flow_name)
+	PRIMARY KEY (schedule_id)
 );
+
+CREATE INDEX sched_project_id ON schedules(project_id, flow_name);
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index ea19307..f10b43e 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -567,7 +567,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		ret.put("nodeStatus", nodeStatus);
 		ret.put("disabled", options.getDisabledJobs());
 		
-		Schedule sflow = scheduleManager.getSchedule(project.getId(), exflow.getFlowId());
+		Schedule sflow = null;// = scheduleManager.getSchedule(project.getId(), exflow.getFlowId());
 		
 		for (Schedule sched: scheduleManager.getSchedules()) {
 			if (sched.getProjectId() == project.getId() && sched.getFlowName().equals(exflow.getFlowId())) {
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 7dcc6ab..4f4bd32 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -109,16 +109,16 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 	private void ajaxSetSla(HttpServletRequest req, HashMap<String, Object> ret, User user) {
 		try {
 			
-			int projectId = getIntParam(req, "projectId");
-			String flowName = getParam(req, "flowName");
+			int scheduleId = getIntParam(req, "scheduleId");
 			
-			Project project = projectManager.getProject(projectId);
+			Schedule sched = scheduleManager.getSchedule(scheduleId);
+			
+			Project project = projectManager.getProject(sched.getProjectId());
 			if(!hasPermission(project, user, Permission.Type.SCHEDULE)) {
 				ret.put("error", "User " + user + " does not have permission to set SLA for this flow.");
 				return;
 			}
 			
-			Schedule sched = scheduleManager.getSchedule(projectId, flowName);
 			
 			SlaOptions slaOptions= new SlaOptions();
 			
@@ -155,7 +155,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 			scheduleManager.insertSchedule(sched);
 
 			if(slaOptions != null) {
-				projectManager.postProjectEvent(project, EventType.SLA, user.getUserId(), "SLA for flow " + flowName + " has been added/changed.");
+				projectManager.postProjectEvent(project, EventType.SLA, user.getUserId(), "SLA for flow " + sched.getFlowName() + " has been added/changed.");
 			}
 			
 		} catch (ServletException e) {
@@ -204,26 +204,24 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 	}
 
 	private void ajaxSlaInfo(HttpServletRequest req, HashMap<String, Object> ret, User user) {
-		int projId;
-		String flowName;
+		int scheduleId;
 		try {
-			projId = getIntParam(req, "projId");
-			flowName = getParam(req, "flowName");
+			scheduleId = getIntParam(req, "scheduleId");
+			
+			Schedule sched = scheduleManager.getSchedule(scheduleId);
 			
-			Project project = getProjectAjaxByPermission(ret, projId, user, Type.READ);
+			Project project = getProjectAjaxByPermission(ret, sched.getProjectId(), user, Type.READ);
 			if (project == null) {
-				ret.put("error", "Error loading project. Project " + projId + " doesn't exist");
+				ret.put("error", "Error loading project. Project " + sched.getProjectId() + " doesn't exist");
 				return;
 			}
 			
-			Flow flow = project.getFlow(flowName);
+			Flow flow = project.getFlow(sched.getFlowName());
 			if (flow == null) {
-				ret.put("error", "Error loading flow. Flow " + flowName + " doesn't exist in " + projId);
+				ret.put("error", "Error loading flow. Flow " + sched.getFlowName() + " doesn't exist in " + sched.getProjectId());
 				return;
 			}
 			
-			Schedule sched = scheduleManager.getSchedule(projId, flowName);
-			
 			SlaOptions slaOptions = sched.getSlaOptions();
 			ExecutionOptions flowOptions = sched.getExecutionOptions();
 			
@@ -433,32 +431,34 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 	}
 
 	private void ajaxRemoveSched(HttpServletRequest req, Map<String, Object> ret, User user) throws ServletException{
-		int projectId = getIntParam(req, "projectId");
-		String flowName = getParam(req, "flowName");
-		Schedule sched = scheduleManager.getSchedule(projectId, flowName);
-
-//		int projectId = sched.getProjectId();
+		int scheduleId = getIntParam(req, "scheduleId");
+		Schedule sched = scheduleManager.getSchedule(scheduleId);
+		if(sched == null) {
+			ret.put("message", "Schedule with ID " + scheduleId + " does not exist");
+			ret.put("status", "error");
+			return;
+		}
 
-		Project project = projectManager.getProject(projectId);
+		Project project = projectManager.getProject(sched.getProjectId());
 		
 		if (project == null) {
-			ret.put("message", "Project " + projectId + " does not exist");
+			ret.put("message", "Project " + sched.getProjectId() + " does not exist");
 			ret.put("status", "error");
 			return;
 		}
 		
 		if(!hasPermission(project, user, Type.SCHEDULE)) {
 			ret.put("status", "error");
-			ret.put("message", "Permission denied. Cannot remove schedule " + projectId + "."  + flowName);
+			ret.put("message", "Permission denied. Cannot remove schedule with id " + scheduleId);
 			return;
 		}
 
-		scheduleManager.removeSchedule(projectId, flowName);
+		scheduleManager.removeSchedule(sched);
 		logger.info("User '" + user.getUserId() + " has removed schedule " + sched.getScheduleName());
 		projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + sched.toString() + " has been removed.");
 		
 		ret.put("status", "success");
-		ret.put("message", "flow " + flowName + " removed from Schedules.");
+		ret.put("message", "flow " + sched.getFlowName() + " removed from Schedules.");
 		return;
 	}
 
@@ -509,7 +509,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 			ret.put("error", e.getMessage());
 		}
 		
-		Schedule sched = scheduleManager.getSchedule(projectId, flowName);
+		// Schedule sched = scheduleManager.getSchedule(projectId, flowName);
 		ExecutionOptions flowOptions = null;
 		try {
 			flowOptions = HttpRequestUtils.parseFlowOptions(req);
@@ -518,12 +518,12 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 			ret.put("error", e.getMessage());
 		}
 		SlaOptions slaOptions = null;
-		if(sched != null) {
-			if(sched.getSlaOptions() != null) {
-				slaOptions = sched.getSlaOptions();
-			}
-		}
-		Schedule schedule = scheduleManager.scheduleFlow(projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(), thePeriod, DateTime.now().getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);
+		//		if(sched != null) {
+		//			if(sched.getSlaOptions() != null) {
+		//				slaOptions = sched.getSlaOptions();
+		//			}
+		//		}
+		Schedule schedule = scheduleManager.scheduleFlow(-1, projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(), thePeriod, DateTime.now().getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);
 		logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName +  " (" + projectId +")" + "].");
 		projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.toString() + " has been added.");
 
diff --git a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
index f65bdfb..4e2fbd8 100644
--- a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
@@ -69,6 +69,7 @@
 					<thead>
 						<tr>
 							<!--th class="execid">Execution Id</th-->
+							<th>ID</th>
 							<th>Flow</th>
 							<th>Project</th>
 							<th>Submitted By</th>
@@ -84,6 +85,7 @@
 #foreach($sched in $schedules)
 						<tr class="row" >
 
+							<td>${sched.scheduleId}</td>
 							<td class="tb-name">
 								<a href="${context}/manager?project=${sched.projectName}&flow=${sched.flowName}">${sched.flowName}</a>
 							</td>
@@ -95,8 +97,8 @@
 							<td>$utils.formatDateTime(${sched.nextExecTime})</td>
 							<td>$utils.formatPeriod(${sched.period})</td>
 							<td>#if(${sched.slaOptions}) true #else false #end</td>
-							<td><button id="removeSchedBtn" onclick="removeSched(${sched.projectId}, '${sched.flowName}')" >Remove Schedule</button></td>
-							<td><button id="addSlaBtn" onclick="slaView.initFromSched(${sched.projectId}, '${sched.flowName}')" >Set SLA</button></td>
+							<td><button id="removeSchedBtn" onclick="removeSched(${sched.scheduleId})" >Remove Schedule</button></td>
+							<td><button id="addSlaBtn" onclick="slaView.initFromSched(${sched.scheduleId}, '${sched.flowName}')" >Set SLA</button></td>
 						</tr>
 #end
 #else
diff --git a/src/sql/create_all.sql~ b/src/sql/create_all.sql~
new file mode 100644
index 0000000..70deb54
--- /dev/null
+++ b/src/sql/create_all.sql~
@@ -0,0 +1,168 @@
+CREATE TABLE active_executing_flows (
+	exec_id INT,
+	host VARCHAR(255),
+	port INT,
+	update_time BIGINT,
+	PRIMARY KEY (exec_id)
+) ENGINE=InnoDB;
+
+CREATE TABLE execution_flows (
+	exec_id INT NOT NULL AUTO_INCREMENT,
+	project_id INT NOT NULL,
+	version INT NOT NULL,
+	flow_id VARCHAR(128) NOT NULL,
+	status TINYINT,
+	submit_user VARCHAR(64),
+	submit_time BIGINT,
+	update_time BIGINT,
+	start_time BIGINT,
+	end_time BIGINT,
+	enc_type TINYINT,
+	flow_data LONGBLOB,
+	PRIMARY KEY (exec_id),
+	INDEX start_time (start_time),
+	INDEX end_time (end_time),
+	INDEX time_range (start_time, end_time),
+	INDEX (project_id, flow_id)
+) ENGINE=InnoDB;
+
+CREATE TABLE execution_jobs (
+	exec_id INT NOT NULL,
+	project_id INT NOT NULL,
+	version INT NOT NULL,
+	flow_id VARCHAR(128) NOT NULL,
+	job_id VARCHAR(128) NOT NULL,
+	attempt INT,
+	start_time BIGINT,
+	end_time BIGINT,
+	status TINYINT,
+	input_params LONGBLOB,
+	output_params LONGBLOB,
+	attachments LONGBLOB,
+	PRIMARY KEY (exec_id, job_id, attempt),
+	INDEX exec_job (exec_id, job_id),
+	INDEX exec_id (exec_id),
+	INDEX job_id (project_id, job_id)
+) ENGINE=InnoDB;
+
+CREATE TABLE execution_logs (
+	exec_id INT NOT NULL,
+	name VARCHAR(128),
+	attempt INT,
+	enc_type TINYINT,
+	start_byte INT,
+	end_byte INT,
+	log LONGBLOB,
+	upload_time BIGINT,
+	PRIMARY KEY (exec_id, name, attempt, start_byte),
+	INDEX log_attempt (exec_id, name, attempt),
+	INDEX log_index (exec_id, name)
+) ENGINE=InnoDB;
+
+CREATE TABLE project_events (
+	project_id INT NOT NULL,
+	event_type TINYINT NOT NULL,
+	event_time BIGINT NOT NULL,
+	username VARCHAR(64),
+	message VARCHAR(512),
+	INDEX log (project_id, event_time)
+) ENGINE=InnoDB;
+
+CREATE TABLE project_files (
+	project_id INT NOT NULL,
+	version INT not NULL,
+	chunk INT,
+	size INT,
+	file LONGBLOB,
+	PRIMARY KEY (project_id, version, chunk),
+	INDEX file_version (project_id, version)
+) ENGINE=InnoDB;
+
+CREATE TABLE project_flows (
+	project_id INT NOT NULL,
+	version INT NOT NULL,
+	flow_id VARCHAR(128),
+	modified_time BIGINT NOT NULL,
+	encoding_type TINYINT,
+	json BLOB,
+	PRIMARY KEY (project_id, version, flow_id),
+	INDEX (project_id, version)
+) ENGINE=InnoDB;
+
+
+CREATE TABLE project_permissions (
+	project_id VARCHAR(64) NOT NULL,
+	modified_time BIGINT NOT NULL,
+	name VARCHAR(64) NOT NULL,
+	permissions INT NOT NULL,
+	isGroup BOOLEAN NOT NULL,
+	PRIMARY KEY (project_id, name),
+	INDEX project_id (project_id)
+) ENGINE=InnoDB;
+
+CREATE TABLE project_properties (
+	project_id INT NOT NULL,
+	version INT NOT NULL,
+	name VARCHAR(128),
+	modified_time BIGINT NOT NULL,
+	encoding_type TINYINT,
+	property BLOB,
+	PRIMARY KEY (project_id, version, name),
+	INDEX (project_id, version)
+) ENGINE=InnoDB;
+
+CREATE TABLE projects (
+	id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
+	name VARCHAR(64) NOT NULL,
+	active BOOLEAN,
+	modified_time BIGINT NOT NULL,
+	create_time BIGINT NOT NULL,
+	version INT,
+	last_modified_by VARCHAR(64) NOT NULL,
+	description VARCHAR(255),
+	enc_type TINYINT,
+	settings_blob LONGBLOB,
+	UNIQUE INDEX project_id (id),
+	INDEX project_name (name)
+) ENGINE=InnoDB;
+
+CREATE TABLE project_versions (
+	project_id INT NOT NULL,
+	version INT not NULL,
+	upload_time BIGINT NOT NULL,
+	uploader VARCHAR(64) NOT NULL,
+	file_type VARCHAR(16),
+	file_name VARCHAR(128),
+	md5 BINARY(16),
+	num_chunks INT,
+	PRIMARY KEY (project_id, version),
+	INDEX project_version_id (project_id)
+) ENGINE=InnoDB;
+
+CREATE TABLE schedules (
+	project_id INT NOT NULL,
+	project_name VARCHAR(128) NOT NULL,
+	flow_name VARCHAR(128) NOT NULL,
+	status VARCHAR(16),
+	first_sched_time BIGINT,
+	timezone VARCHAR(64),
+	period VARCHAR(16),
+	last_modify_time BIGINT,
+	next_exec_time BIGINT,
+	submit_time BIGINT,
+	submit_user VARCHAR(128),
+	enc_type TINYINT,
+	schedule_options LONGBLOB,
+	primary key(project_id, flow_name)
+) ENGINE=InnoDB;
+
+
+CREATE TABLE active_sla (
+	exec_id INT NOT NULL,
+	job_name VARCHAR(128) NOT NULL,
+	check_time BIGINT NOT NULL,
+	rule TINYINT NOT NULL,
+	enc_type TINYINT,
+	options LONGBLOB NOT NULL,
+	primary key(exec_id, job_name)
+) ENGINE=InnoDB;
diff --git a/src/sql/create_schedule_table.sql b/src/sql/create_schedule_table.sql
index a2ca6d2..32b7d86 100644
--- a/src/sql/create_schedule_table.sql
+++ b/src/sql/create_schedule_table.sql
@@ -1,4 +1,5 @@
 CREATE TABLE schedules (
+	schedule_id INT NOT NULL AUTO_INCREMENT,
 	project_id INT NOT NULL,
 	project_name VARCHAR(128) NOT NULL,
 	flow_name VARCHAR(128) NOT NULL,
@@ -12,6 +13,6 @@ CREATE TABLE schedules (
 	submit_user VARCHAR(128),
 	enc_type TINYINT,
 	schedule_options LONGBLOB,
-	primary key(project_id, flow_name)
-) ENGINE=InnoDB;
-
+	PRIMARY KEY (schedule_id)
+	INDEX project_id (project_id, flow_name),
+);
diff --git a/src/sql/update_2.0_to_2.1.sql b/src/sql/update_2.0_to_2.1.sql
index 6420d1f..b8554e1 100644
--- a/src/sql/update_2.0_to_2.1.sql
+++ b/src/sql/update_2.0_to_2.1.sql
@@ -12,11 +12,15 @@ UPDATE execution_logs SET upload_time=(UNIX_TIMESTAMP()*1000) WHERE upload_time=
 
 ALTER TABLE execution_logs DROP PRIMARY KEY;
 ALTER TABLE execution_logs ADD PRIMARY KEY(exec_id, name, attempt, start_byte);
-ALTER TABLE execution_logs ADD INDEX log_attempt (exec_id, name, attempt)
+ALTER TABLE execution_logs ADD INDEX log_attempt (exec_id, name, attempt);
 
 ALTER TABLE schedules ADD COLUMN enc_type TINYINT;
 ALTER TABLE schedules ADD COLUMN schedule_options LONGBLOB;
 
+ALTER TABLE schedules DROP PRIMARY KEY;
+ALTER TABLE schedules ADD COLUMN schedule_id INT PRIMARY KEY NOT NULL AUTO_INCREMENT;
+ALTER TABLE schedules ADD INDEX project_id (project_id, flow_name);
+
 ALTER TABLE project_events MODIFY COLUMN message VARCHAR(512);
 
 ALTER TABLE projects ADD COLUMN enc_type TINYINT;
diff --git a/src/web/js/#azkaban.schedule.svg.js# b/src/web/js/#azkaban.schedule.svg.js#
new file mode 100644
index 0000000..2e7ea56
--- /dev/null
+++ b/src/web/js/#azkaban.schedule.svg.js#
@@ -0,0 +1,342 @@
+$.namespace('azkaban');
+function removeSched(projectId, flowName) {
+	var scheduleURL = contextURL + "/schedule"
+	var redirectURL = contextURL + "/schedule"
+	$.post(
+			scheduleURL,
+			{"action":"removeSched", "projectId":projectId, "flowName":flowName},
+			function(data) {
+				if (data.error) {
+//                 alert(data.error)
+					$('#errorMsg').text(data.error);
+				}
+				else {
+// 		 alert("Schedule "+schedId+" removed!")
+					window.location = redirectURL;
+				}
+			},
+			"json"
+	)
+}
+
+function removeSla(projectId, flowName) {
+	var scheduleURL = contextURL + "/schedule"
+	var redirectURL = contextURL + "/schedule"
+	$.post(
+			scheduleURL,
+			{"action":"removeSla", "projectId":projectId, "flowName":flowName},
+			function(data) {
+				if (data.error) {
+//                 alert(data.error)
+					$('#errorMsg').text(data.error)
+				}
+				else {
+// 		 alert("Schedule "+schedId+" removed!")
+					window.location = redirectURL
+				}
+			},
+			"json"
+	)
+}
+
+azkaban.ChangeSlaView = Backbone.View.extend({
+	events : {
+		"click" : "closeEditingTarget",
+		"click #set-sla-btn": "handleSetSla",	
+		"click #remove-sla-btn": "handleRemoveSla",
+		"click #sla-cancel-btn": "handleSlaCancel",
+		"click .modal-close": "handleSlaCancel",
+		"click #addRow": "handleAddRow"
+	},
+	initialize: function(setting) {
+
+	},
+	handleSlaCancel: function(evt) {
+		console.log("Clicked cancel button");
+		var scheduleURL = contextURL + "/schedule";
+
+		$('#slaModalBackground').hide();
+		$('#sla-options').hide();
+		
+		var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
+		var rows = tFlowRules.rows;
+		var rowLength = rows.length
+		for(var i = 0; i < rowLength-1; i++) {
+			tFlowRules.deleteRow(0);
+		}
+		
+	},
+	initFromSched: function(projId, flowName) {
+		this.projectId = projId;
+		this.flowName = flowName;
+		
+		var scheduleURL = contextURL + "/schedule"
+		this.scheduleURL = scheduleURL;
+		var indexToName = {};
+		var nameToIndex = {};
+		var indexToText = {};
+		this.indexToName = indexToName;
+		this.nameToIndex = nameToIndex;
+		this.indexToText = indexToText;
+		var ruleBoxOptions = ["SUCCESS", "FINISH"];
+		this.ruleBoxOptions = ruleBoxOptions;
+		
+		var fetchScheduleData = {"projId": this.projectId, "ajax":"slaInfo", "flowName":this.flowName};
+		
+		$.get(
+				this.scheduleURL,
+				fetchScheduleData,
+				function(data) {
+					if (data.error) {
+						alert(data.error);
+					}
+					else {
+						if (data.slaEmails) {
+							$('#slaEmails').val(data.slaEmails.join());
+						}
+						
+						var allJobNames = data.allJobNames;
+						
+						indexToName[0] = "";
+						nameToIndex[flowName] = 0;
+						indexToText[0] = "flow " + flowName;
+						for(var i = 1; i <= allJobNames.length; i++) {
+							indexToName[i] = allJobNames[i-1];
+							nameToIndex[allJobNames[i-1]] = i;
+							indexToText[i] = "job " + allJobNames[i-1];
+						}
+						
+						
+						
+						
+						
+						// populate with existing settings
+						if(data.settings) {
+							
+							var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
+							
+							for(var setting in data.settings) {
+								var rFlowRule = tFlowRules.insertRow(0);
+								
+								var cId = rFlowRule.insertCell(-1);
+								var idSelect = document.createElement("select");
+								for(var i in indexToName) {
+									idSelect.options[i] = new Option(indexToText[i], indexToName[i]);
+									if(data.settings[setting].id == indexToName[i]) {
+										idSelect.options[i].selected = true;
+									}
+								}								
+								cId.appendChild(idSelect);
+								
+								var cRule = rFlowRule.insertCell(-1);
+								var ruleSelect = document.createElement("select");
+								for(var i in ruleBoxOptions) {
+									ruleSelect.options[i] = new Option(ruleBoxOptions[i], ruleBoxOptions[i]);
+									if(data.settings[setting].rule == ruleBoxOptions[i]) {
+										ruleSelect.options[i].selected = true;
+									}
+								}
+								cRule.appendChild(ruleSelect);
+								
+								var cDuration = rFlowRule.insertCell(-1);
+								var duration = document.createElement("input");
+								duration.type = "text";
+								duration.setAttribute("class", "durationpick");
+								var rawMinutes = data.settings[setting].duration;
+								var intMinutes = rawMinutes.substring(0, rawMinutes.length-1);
+								var minutes = parseInt(intMinutes);
+								var hours = Math.floor(minutes / 60);
+								minutes = minutes % 60;
+								duration.value = hours + ":" + minutes;
+								cDuration.appendChild(duration);
+
+								var cEmail = rFlowRule.insertCell(-1);
+								var emailCheck = document.createElement("input");
+								emailCheck.type = "checkbox";
+								for(var act in data.settings[setting].actions) {
+									if(data.settings[setting].actions[act] == "EMAIL") {
+										emailCheck.checked = true;
+									}
+								}
+								cEmail.appendChild(emailCheck);
+								
+								var cKill = rFlowRule.insertCell(-1);
+								var killCheck = document.createElement("input");
+								killCheck.type = "checkbox";
+								for(var act in data.settings[setting].actions) {
+									if(data.settings[setting].actions[act] == "KILL") {
+										killCheck.checked = true;
+									}
+								}
+								cKill.appendChild(killCheck);
+								
+								$('.durationpick').timepicker({hourMax: 99});
+							}
+						}
+						$('.durationpick').timepicker({hourMax: 99});
+					}
+				},
+				"json"
+			);
+		
+		$('#slaModalBackground').show();
+		$('#sla-options').show();
+		
+//		this.schedFlowOptions = sched.flowOptions
+		console.log("Loaded schedule info. Ready to set SLA.");
+
+	},
+	handleRemoveSla: function(evt) {
+		console.log("Clicked remove sla button");
+		var scheduleURL = this.scheduleURL;
+		var redirectURL = this.scheduleURL;
+		$.post(
+				scheduleURL,
+				{"action":"removeSla", "projectId":this.projectId, "flowName":this.flowName},
+				function(data) {
+				if (data.error) {
+						$('#errorMsg').text(data.error)
+					}
+					else {
+						window.location = redirectURL
+					}
+				"json"
+				}
+			);
+
+	},
+	handleSetSla: function(evt) {
+
+		var slaEmails = $('#slaEmails').val();
+		var settings = {};
+		
+		
+		var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
+		for(var row = 0; row < tFlowRules.rows.length-1; row++) {
+			var rFlowRule = tFlowRules.rows[row];
+			var id = rFlowRule.cells[0].firstChild.value;
+			var rule = rFlowRule.cells[1].firstChild.value;
+			var duration = rFlowRule.cells[2].firstChild.value;
+			var email = rFlowRule.cells[3].firstChild.checked;
+			var kill = rFlowRule.cells[4].firstChild.checked;
+			settings[row] = id + "," + rule + "," + duration + "," + email + "," + kill; 
+		}
+
+		var slaData = {
+			projectId: this.projectId,
+			flowName: this.flowName,
+			ajax: "setSla",			
+			slaEmails: slaEmails,
+			settings: settings
+		};
+
+		var scheduleURL = this.scheduleURL;
+		
+		$.post(
+			scheduleURL,
+			slaData,
+			function(data) {
+				if (data.error) {
+					alert(data.error);
+				}
+				else {
+					tFlowRules.length = 0;
+					window.location = scheduleURL;
+				}
+			},
+			"json"
+		);
+	},
+	handleAddRow: function(evt) {
+		
+		var indexToName = this.indexToName;
+		var nameToIndex = this.nameToIndex;
+		var indexToText = this.indexToText;
+		var ruleBoxOptions = this.ruleBoxOptions;
+
+		var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
+		var rFlowRule = tFlowRules.insertRow(tFlowRules.rows.length-1);
+		
+		var cId = rFlowRule.insertCell(-1);
+		var idSelect = document.createElement("select");
+		for(var i in indexToName) {
+			idSelect.options[i] = new Option(indexToText[i], indexToName[i]);
+		}
+		
+		cId.appendChild(idSelect);
+		
+		var cRule = rFlowRule.insertCell(-1);
+		var ruleSelect = document.createElement("select");
+		for(var i in ruleBoxOptions) {
+			ruleSelect.options[i] = new Option(ruleBoxOptions[i], ruleBoxOptions[i]);
+		}
+		cRule.appendChild(ruleSelect);
+		
+		var cDuration = rFlowRule.insertCell(-1);
+		var duration = document.createElement("input");
+		duration.type = "text";
+		duration.setAttribute("class", "durationpick");
+		cDuration.appendChild(duration);
+
+		var cEmail = rFlowRule.insertCell(-1);
+		var emailCheck = document.createElement("input");
+		emailCheck.type = "checkbox";
+		cEmail.appendChild(emailCheck);
+		
+		var cKill = rFlowRule.insertCell(-1);
+		var killCheck = document.createElement("input");
+		killCheck.type = "checkbox";
+		cKill.appendChild(killCheck);
+		
+		$('.durationpick').timepicker({hourMax: 99});
+
+		return rFlowRule;
+	},
+	handleEditColumn : function(evt) {
+		var curTarget = evt.currentTarget;
+	
+		if (this.editingTarget != curTarget) {
+			this.closeEditingTarget();
+			
+			var text = $(curTarget).children(".spanValue").text();
+			$(curTarget).empty();
+						
+			var input = document.createElement("input");
+			$(input).attr("type", "text");
+			$(input).css("width", "100%");
+			$(input).val(text);
+			$(curTarget).addClass("editing");
+			$(curTarget).append(input);
+			$(input).focus();
+			this.editingTarget = curTarget;
+		}
+	},
+	handleRemoveColumn : function(evt) {
+		var curTarget = evt.currentTarget;
+		// Should be the table
+		var row = curTarget.parentElement.parentElement;
+		$(row).remove();
+	},
+	closeEditingTarget: function(evt) {
+
+	}
+});
+
+var slaView;
+var tableSorterView;
+$(function() {
+	var selected;
+
+
+	slaView = new azkaban.ChangeSlaView({el:$('#sla-options')});
+	tableSorterView = new azkaban.TableSorter({el:$('#scheduledFlowsTbl')});
+//	var requestURL = contextURL + "/manager";
+
+	// Set up the Flow options view. Create a new one every time :p
+//	 $('#addSlaBtn').click( function() {
+//		 slaView.show();
+//	 });
+
+	 
+	
+});
\ No newline at end of file
diff --git a/src/web/js/azkaban.scheduled.view.js b/src/web/js/azkaban.scheduled.view.js
index a6f1e83..f507dc1 100644
--- a/src/web/js/azkaban.scheduled.view.js
+++ b/src/web/js/azkaban.scheduled.view.js
@@ -1,12 +1,12 @@
 $.namespace('azkaban');
 
 
-function removeSched(projectId, flowName) {
+function removeSched(scheduleId) {
 	var scheduleURL = contextURL + "/schedule"
 	var redirectURL = contextURL + "/schedule"
 	$.post(
 			scheduleURL,
-			{"action":"removeSched", "projectId":projectId, "flowName":flowName},
+			{"action":"removeSched", "scheduleId":scheduleId},
 			function(data) {
 				if (data.error) {
 //                 alert(data.error)
@@ -21,12 +21,12 @@ function removeSched(projectId, flowName) {
 	)
 }
 
-function removeSla(projectId, flowName) {
+function removeSla(scheduleId) {
 	var scheduleURL = contextURL + "/schedule"
 	var redirectURL = contextURL + "/schedule"
 	$.post(
 			scheduleURL,
-			{"action":"removeSla", "projectId":projectId, "flowName":flowName},
+			{"action":"removeSla", "scheduleId":scheduleId},
 			function(data) {
 				if (data.error) {
 //                 alert(data.error)
@@ -68,9 +68,8 @@ azkaban.ChangeSlaView = Backbone.View.extend({
 		}
 		
 	},
-	initFromSched: function(projId, flowName) {
-		this.projectId = projId;
-		this.flowName = flowName;
+	initFromSched: function(scheduleId, flowName) {
+		this.scheduleId = scheduleId;
 		
 		var scheduleURL = contextURL + "/schedule"
 		this.scheduleURL = scheduleURL;
@@ -83,7 +82,7 @@ azkaban.ChangeSlaView = Backbone.View.extend({
 		var ruleBoxOptions = ["SUCCESS", "FINISH"];
 		this.ruleBoxOptions = ruleBoxOptions;
 		
-		var fetchScheduleData = {"projId": this.projectId, "ajax":"slaInfo", "flowName":this.flowName};
+		var fetchScheduleData = {"scheduleId": this.scheduleId, "ajax":"slaInfo"};
 		
 		$.get(
 				this.scheduleURL,
@@ -194,7 +193,7 @@ azkaban.ChangeSlaView = Backbone.View.extend({
 		var redirectURL = this.scheduleURL;
 		$.post(
 				scheduleURL,
-				{"action":"removeSla", "projectId":this.projectId, "flowName":this.flowName},
+				{"action":"removeSla", "scheduleId":this.scheduleId},
 				function(data) {
 				if (data.error) {
 						$('#errorMsg').text(data.error)
@@ -225,8 +224,7 @@ azkaban.ChangeSlaView = Backbone.View.extend({
 		}
 
 		var slaData = {
-			projectId: this.projectId,
-			flowName: this.flowName,
+			scheduleId: this.scheduleId,
 			ajax: "setSla",			
 			slaEmails: slaEmails,
 			settings: settings
diff --git a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
index 7ead267..2a7245c 100644
--- a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
+++ b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
@@ -127,12 +127,12 @@ public class JdbcScheduleLoaderTest {
 		slaOptions.setSlaEmails(emails);
 		slaOptions.setSettings(slaSets);
 		
-		Schedule s1 = new Schedule(1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
-		Schedule s2 = new Schedule(1, "proj1", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "ccc", flowOptions, slaOptions);
-		Schedule s3 = new Schedule(2, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
-		Schedule s4 = new Schedule(3, "proj2", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
-		Schedule s5 = new Schedule(3, "proj2", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
-		Schedule s6 = new Schedule(3, "proj2", "flow3", "error", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+		Schedule s1 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+		Schedule s2 = new Schedule(-1, 1, "proj1", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "ccc", flowOptions, slaOptions);
+		Schedule s3 = new Schedule(-1, 2, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+		Schedule s4 = new Schedule(-1, 3, "proj2", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+		Schedule s5 = new Schedule(-1, 3, "proj2", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+		Schedule s6 = new Schedule(-1, 3, "proj2", "flow3", "error", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
 		
 		loader.insertSchedule(s1);
 		loader.insertSchedule(s2);
@@ -194,14 +194,14 @@ public class JdbcScheduleLoaderTest {
 		
 		System.out.println("the flow options are " + flowOptions);
 		System.out.println("the sla options are " + slaOptions);
-		Schedule s1 = new Schedule(1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+		Schedule s1 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
 
 		loader.insertSchedule(s1);
 		
 		emails.add("email3");
 		slaOptions.setSlaEmails(emails);
 		
-		Schedule s2 = new Schedule(1, "proj1", "flow1", "ready", 11112, "America/Los_Angeles", "2M", 22223, 33334, 44445, "cyu", flowOptions, slaOptions);
+		Schedule s2 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11112, "America/Los_Angeles", "2M", 22223, 33334, 44445, "cyu", flowOptions, slaOptions);
 
 		loader.updateSchedule(s2);
 		
@@ -253,7 +253,7 @@ public class JdbcScheduleLoaderTest {
 			slaOptions.setSlaEmails(emails);
 			slaOptions.setSettings(slaSets);
 			
-			Schedule s = new Schedule(i+1, "proj"+(i+1), "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+			Schedule s = new Schedule(-1, i+1, "proj"+(i+1), "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
 			schedules.add(s);
 			try {
 				loader.insertSchedule(s);