azkaban-developers

Details

diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index dcf4de2..53dcf34 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -35,7 +35,7 @@ public class ExecutableFlow {
 	private boolean submitted = false;
 	
 	public enum Status {
-		FAILED, FAILED_FINISHING, SUCCEEDED, RUNNING, WAITING, KILLED, DISABLED, READY, PAUSED, UNKNOWN
+		FAILED, FAILED_FINISHING, SUCCEEDED, RUNNING, WAITING, KILLED, DISABLED, READY, UNKNOWN, PAUSED
 	}
 	
 	public ExecutableFlow(String id, Flow flow) {
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index 09628b4..e27a65d 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -82,8 +82,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 
 	private void createLogger() {
 		// Create logger
-		String loggerName = System.currentTimeMillis() + "."
-				+ flow.getExecutionId();
+		String loggerName = System.currentTimeMillis() + "." + flow.getExecutionId();
 		logger = Logger.getLogger(loggerName);
 
 		// Create file appender
diff --git a/src/java/azkaban/project/FileProjectManager.java b/src/java/azkaban/project/FileProjectManager.java
index 01ee65e..ee86bab 100644
--- a/src/java/azkaban/project/FileProjectManager.java
+++ b/src/java/azkaban/project/FileProjectManager.java
@@ -41,6 +41,7 @@ import azkaban.utils.Props;
  */
 public class FileProjectManager implements ProjectManager {
 	public static final String DIRECTORY_PARAM = "file.project.loader.path";
+	private static final String DELETED_PROJECT_PREFIX = ".DELETED.";
 	private static final DateTimeFormatter FILE_DATE_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd-HH:mm.ss.SSS");
 	private static final String PROPERTIES_FILENAME = "project.json";
 	private static final String PROJECT_DIRECTORY = "src";
@@ -99,6 +100,9 @@ public class FileProjectManager implements ProjectManager {
 			if (!dir.isDirectory()) {
 				logger.error("ERROR loading project from " + dir.getPath() + ". Not a directory.");
 			} 
+			else if (dir.getName().startsWith(DELETED_PROJECT_PREFIX)) {
+				continue;
+			}
 			else {
 				File propertiesFile = new File(dir, PROPERTIES_FILENAME);
 				if (!propertiesFile.exists()) {
@@ -435,10 +439,25 @@ public class FileProjectManager implements ProjectManager {
 	}
 
 	@Override
-	public synchronized Project removeProject(String projectName) {
-		return null;
+	public synchronized Project removeProject(String projectName) throws ProjectManagerException {
+		Project project = this.getProject(projectName);
+		
+		if (project == null) {
+			throw new ProjectManagerException("Project " + projectName + " doesn't exist.");
+		}
+		
+		File projectPath = new File(projectDirectory, projectName);
+		File deletedProjectPath = new File(projectDirectory, DELETED_PROJECT_PREFIX + System.currentTimeMillis() + "." + projectName);
+		if (projectPath.exists()) {
+			if (!projectPath.renameTo(deletedProjectPath)) {
+				throw new ProjectManagerException("Deleting of project failed.");
+			}
+		}
+		
+		projects.remove(projectName);
+		return project;
 	}
-
+	
 	private static class SuffixFilter implements FileFilter {
 		private String suffix;
 
diff --git a/src/java/azkaban/project/ProjectLogger.java b/src/java/azkaban/project/ProjectLogger.java
new file mode 100644
index 0000000..5d6760f
--- /dev/null
+++ b/src/java/azkaban/project/ProjectLogger.java
@@ -0,0 +1,62 @@
+package azkaban.project;
+
+import java.io.File;
+import java.util.logging.Level;
+
+import org.apache.log4j.Appender;
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Layout;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.Element;
+import net.sf.ehcache.config.CacheConfiguration;
+import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
+
+/*
+ * 
+ */
+public class ProjectLogger {
+	
+	private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
+	private static final int TIME_TO_IDLE_SECS = 30 * 60; // 30 mins
+	private static final String PROJECT_SUFFIX = ".PROJECT";
+	private CacheManager manager = CacheManager.create();
+	private Cache cache;
+	private String path;
+
+	public ProjectLogger(String projectPath) {
+		CacheConfiguration config = new CacheConfiguration();
+		config.setName("loggerCache");
+		config.setTimeToLiveSeconds(TIME_TO_IDLE_SECS);
+		config.eternal(false);
+		config.diskPersistent(false);
+		config.memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU);
+
+		cache = new Cache(config);
+		manager.addCache(cache);
+	}
+
+	public void log(Level level, String project, String message) {
+
+	}
+
+	private Logger getLogger(String projectId) {
+		Element element = cache.get(projectId);
+		Logger logger = Logger.getLogger(projectId + PROJECT_SUFFIX);
+
+//		if (element == null) {
+//			File file = new File(path);
+//			new File(file, );
+//			Appender appender = new FileAppender(DEFAULT_LAYOUT, , false);
+//			logger.addAppender(jobAppender);
+//		}
+//		else {
+//			Object obj = element.getObjectValue();
+//		}
+
+		return null;
+	}
+}
diff --git a/src/java/azkaban/scheduler/LocalFileScheduleLoader.java b/src/java/azkaban/scheduler/LocalFileScheduleLoader.java
index 64d7654..733ee77 100644
--- a/src/java/azkaban/scheduler/LocalFileScheduleLoader.java
+++ b/src/java/azkaban/scheduler/LocalFileScheduleLoader.java
@@ -25,7 +25,6 @@ import org.joda.time.Seconds;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
-import azkaban.user.User;
 import azkaban.utils.Props;
 import azkaban.utils.JSONUtils;
 
@@ -51,10 +50,12 @@ import azkaban.utils.JSONUtils;
  */
 public class LocalFileScheduleLoader implements ScheduleLoader {
 	private static final String SCHEDULEID = "scheduleId";
+	private static final String PROJECTID = "projectId";
+	private static final String FLOWID = "flowId";
 	private static final String USER = "user";
 	private static final String USERSUBMIT = "userSubmit";
-    private static final String SUBMITTIME = "submitTime";
-    private static final String FIRSTSCHEDTIME = "firstSchedTime";
+	private static final String SUBMITTIME = "submitTime";
+	private static final String FIRSTSCHEDTIME = "firstSchedTime";
 	
 	private static final String SCHEDULE = "schedule";
 	private static final String NEXTEXECTIME = "nextExecTime";
@@ -111,79 +112,77 @@ public class LocalFileScheduleLoader implements ScheduleLoader {
 
 	@Override
 	public List<ScheduledFlow> loadSchedule() {
-        if (scheduleFile != null && backupScheduleFile != null) {
-            if (scheduleFile.exists()) {
-            	if(scheduleFile.length() == 0)
-            		return new ArrayList<ScheduledFlow>();
+		if (scheduleFile != null && backupScheduleFile != null) {
+			if (scheduleFile.exists()) {
+				if (scheduleFile.length() == 0)
+					return new ArrayList<ScheduledFlow>();
 				return loadFromFile(scheduleFile);
-            }
-            else if (backupScheduleFile.exists()) {
-            	backupScheduleFile.renameTo(scheduleFile);
+			} else if (backupScheduleFile.exists()) {
+				backupScheduleFile.renameTo(scheduleFile);
 				return loadFromFile(scheduleFile);
-            }
-            else {
-                logger.warn("No schedule files found looking for " + scheduleFile.getAbsolutePath());
-            }
-        }
-        
-        return new ArrayList<ScheduledFlow>();
+			} else {
+				logger.warn("No schedule files found looking for "
+						+ scheduleFile.getAbsolutePath());
+			}
+		}
+
+		return new ArrayList<ScheduledFlow>();
 	}
 
 	@Override
 	public void saveSchedule(List<ScheduledFlow> schedule) {
-        if (scheduleFile != null && backupScheduleFile != null) {
-            // Delete the backup if it exists and a current file exists.
-            if (backupScheduleFile.exists() && scheduleFile.exists()) {
-            	backupScheduleFile.delete();
-            }
-
-            // Rename the schedule if it exists.
-            if (scheduleFile.exists()) {
-            	scheduleFile.renameTo(backupScheduleFile);
-            }
-
-            HashMap<String,Object> obj = new HashMap<String,Object>();
-            ArrayList<Object> schedules = new ArrayList<Object>();
-            obj.put(SCHEDULE, schedules);
-            //Write out schedule.
-                       
-            for (ScheduledFlow schedFlow : schedule) {
-            	schedules.add(createJSONObject(schedFlow));
-            }
- 
-    		try {
-    			FileWriter writer = new FileWriter(scheduleFile);
-    			writer.write(JSONUtils.toJSON(obj, true));
-    			writer.flush();
-    		} catch (Exception e) {
-    			throw new RuntimeException("Error saving flow file", e);
-    		}
-    		logger.info("schedule saved");
-        }
+		if (scheduleFile != null && backupScheduleFile != null) {
+			// Delete the backup if it exists and a current file exists.
+			if (backupScheduleFile.exists() && scheduleFile.exists()) {
+				backupScheduleFile.delete();
+			}
+
+			// Rename the schedule if it exists.
+			if (scheduleFile.exists()) {
+				scheduleFile.renameTo(backupScheduleFile);
+			}
+
+			HashMap<String, Object> obj = new HashMap<String, Object>();
+			ArrayList<Object> schedules = new ArrayList<Object>();
+			obj.put(SCHEDULE, schedules);
+			// Write out schedule.
+
+			for (ScheduledFlow schedFlow : schedule) {
+				schedules.add(createJSONObject(schedFlow));
+			}
+
+			try {
+				FileWriter writer = new FileWriter(scheduleFile);
+				writer.write(JSONUtils.toJSON(obj, true));
+				writer.flush();
+			} catch (Exception e) {
+				throw new RuntimeException("Error saving flow file", e);
+			}
+			logger.info("schedule saved");
+		}
 	}
 	
     @SuppressWarnings("unchecked")
-	private List<ScheduledFlow> loadFromFile(File schedulefile)
-    {
-    	BufferedReader reader = null;
+	private List<ScheduledFlow> loadFromFile(File schedulefile) {
+		BufferedReader reader = null;
 		try {
 			reader = new BufferedReader(new FileReader(schedulefile));
 		} catch (FileNotFoundException e) {
 			// TODO Auto-generated catch block
 			logger.error("Error loading schedule file ", e);
 		}
-    	List<ScheduledFlow> scheduleList = new ArrayList<ScheduledFlow>();
-    	
+		List<ScheduledFlow> scheduleList = new ArrayList<ScheduledFlow>();
+
 		HashMap<String, Object> schedule;
 		try {
-			//TODO handle first time empty schedule file
-			schedule = (HashMap<String,Object>)JSONUtils.parseJSONFromReader(reader);
+			// TODO handle first time empty schedule file
+			schedule = (HashMap<String, Object>) JSONUtils
+					.parseJSONFromReader(reader);
 		} catch (Exception e) {
-			//schedule = loadLegacyFile(schedulefile);
+			// schedule = loadLegacyFile(schedulefile);
 			logger.error("Error parsing the schedule file", e);
 			throw new RuntimeException("Error parsing the schedule file", e);
-		}
-		finally {
+		} finally {
 			try {
 				reader.close();
 			} catch (IOException e) {
@@ -200,178 +199,183 @@ public class LocalFileScheduleLoader implements ScheduleLoader {
 		}
 		
 		return scheduleList;
-    }
-
-    private ScheduledFlow createScheduledFlow(HashMap<String, Object> obj) {
-    	String scheduleId = (String)obj.get(SCHEDULEID);
-    	String user = (String)obj.get(USER);
-    	String userSubmit = (String)obj.get(USERSUBMIT);
-    	String submitTimeRaw = (String)obj.get(SUBMITTIME);
-    	String firstSchedTimeRaw = (String)obj.get(FIRSTSCHEDTIME);
-    	String nextExecTimeRaw = (String)obj.get(NEXTEXECTIME);
-    	String timezone = (String)obj.get(TIMEZONE);
-    	String recurrence = (String)obj.get(RECURRENCE);
-//    	String scheduleStatus = (String)obj.get(SCHEDULESTATUS);
-    	
-    	DateTime nextExecTime = FILE_DATEFORMAT.parseDateTime(nextExecTimeRaw);
-    	DateTime submitTime = FILE_DATEFORMAT.parseDateTime(submitTimeRaw);
-    	DateTime firstSchedTime = FILE_DATEFORMAT.parseDateTime(firstSchedTimeRaw);
-
-    	if (nextExecTime == null) {
-    		logger.error("No next execution time has been set");
-    		return null;
-    	}
-    	
-    	if (submitTime == null) {
-    		logger.error("No submitTime has been set");
-    	}
-    	
-    	if(firstSchedTime == null){
-    		logger.error("No first scheduled time has been set");
-    	}
-    	
-    	if (timezone != null) {
-    		nextExecTime = nextExecTime.withZoneRetainFields(DateTimeZone.forID(timezone));
-    	}
-
-        ReadablePeriod period = null;
-    	if (recurrence != null) {
-    		period = parsePeriodString(scheduleId, recurrence);
-    	}
-
-    	ScheduledFlow scheduledFlow = new ScheduledFlow(scheduleId, user, userSubmit, submitTime, firstSchedTime, nextExecTime, period);
-    	if (scheduledFlow.updateTime()) {
-    		return scheduledFlow;
-    	}
-    	
-    	logger.info("Removed " + scheduleId + " off out of scheduled. It is not recurring.");
-    	return null;
-    }
-    
-	private HashMap<String,Object> createJSONObject(ScheduledFlow flow) {
-    	HashMap<String,Object> object = new HashMap<String,Object>();
-    	object.put(SCHEDULEID, flow.getScheduleId());
-    	object.put(USER, flow.getUser());
-    	object.put(USERSUBMIT, flow.getUserSubmit());
-   
-    	object.put(SUBMITTIME, FILE_DATEFORMAT.print(flow.getSubmitTime()));
-    	object.put(FIRSTSCHEDTIME, FILE_DATEFORMAT.print(flow.getFirstSchedTime()));
-    	
-    	object.put(NEXTEXECTIME, FILE_DATEFORMAT.print(flow.getNextExecTime()));
-    	object.put(TIMEZONE, flow.getNextExecTime().getZone().getID());
-    	object.put(RECURRENCE, createPeriodString(flow.getPeriod()));
-//    	object.put(SCHEDULESTATUS, flow.getSchedStatus());
-    	
-    	return object;
-    }
-    
-    private ReadablePeriod parsePeriodString(String scheduleId, String periodStr)
-    {
-        ReadablePeriod period;
-        char periodUnit = periodStr.charAt(periodStr.length() - 1);
-        if (periodUnit == 'n') {
-            return null;
-        }
-
-        int periodInt = Integer.parseInt(periodStr.substring(0, periodStr.length() - 1));
-        switch (periodUnit) {
-        	case 'M':
-        		period = Months.months(periodInt);
-        		break;
-        	case 'w':
-        		period = Weeks.weeks(periodInt);
-        		break;
-            case 'd':
-                period = Days.days(periodInt);
-                break;
-            case 'h':
-                period = Hours.hours(periodInt);
-                break;
-            case 'm':
-                period = Minutes.minutes(periodInt);
-                break;
-            case 's':
-                period = Seconds.seconds(periodInt);
-                break;
-            default:
-                throw new IllegalArgumentException("Invalid schedule period unit '" + periodUnit + "' for flow " + scheduleId);
-        }
-
-        return period;
-    }
-
-    private String createPeriodString(ReadablePeriod period)
-    {
-        String periodStr = "n";
-
-        if (period == null) {
-            return "n";
-        }
-
-        if (period.get(DurationFieldType.months()) > 0) {
-            int months = period.get(DurationFieldType.months());
-            periodStr = months + "M";
-        }
-        else if (period.get(DurationFieldType.weeks()) > 0) {
-            int weeks = period.get(DurationFieldType.weeks());
-            periodStr = weeks + "w";
-        }
-        else if (period.get(DurationFieldType.days()) > 0) {
-            int days = period.get(DurationFieldType.days());
-            periodStr = days + "d";
-        }
-        else if (period.get(DurationFieldType.hours()) > 0) {
-            int hours = period.get(DurationFieldType.hours());
-            periodStr = hours + "h";
-        }
-        else if (period.get(DurationFieldType.minutes()) > 0) {
-            int minutes = period.get(DurationFieldType.minutes());
-            periodStr = minutes + "m";
-        }
-        else if (period.get(DurationFieldType.seconds()) > 0) {
-            int seconds = period.get(DurationFieldType.seconds());
-            periodStr = seconds + "s";
-        }
-
-        return periodStr;
-    }
-
-//    private HashMap<String,Object> loadLegacyFile(File schedulefile) {
-//        Props schedule = null;
-//        try {
-//            schedule = new Props(null, schedulefile.getAbsolutePath());
-//        } catch(Exception e) {
-//            throw new RuntimeException("Error loading schedule from " + schedulefile);
-//        }
+	}
+
+	private ScheduledFlow createScheduledFlow(HashMap<String, Object> obj) {
+		String scheduleId = (String) obj.get(SCHEDULEID);
+		String projectId = (String) obj.get(PROJECTID);
+		String flowId = (String) obj.get(FLOWID);
+		String user = (String) obj.get(USER);
+		String userSubmit = (String) obj.get(USERSUBMIT);
+		String submitTimeRaw = (String) obj.get(SUBMITTIME);
+		String firstSchedTimeRaw = (String) obj.get(FIRSTSCHEDTIME);
+		String nextExecTimeRaw = (String) obj.get(NEXTEXECTIME);
+		String timezone = (String) obj.get(TIMEZONE);
+		String recurrence = (String) obj.get(RECURRENCE);
+		// String scheduleStatus = (String)obj.get(SCHEDULESTATUS);
+
+		DateTime nextExecTime = FILE_DATEFORMAT.parseDateTime(nextExecTimeRaw);
+		DateTime submitTime = FILE_DATEFORMAT.parseDateTime(submitTimeRaw);
+		DateTime firstSchedTime = FILE_DATEFORMAT
+				.parseDateTime(firstSchedTimeRaw);
+
+		if (nextExecTime == null) {
+			logger.error("No next execution time has been set");
+			return null;
+		}
+
+		if (submitTime == null) {
+			logger.error("No submitTime has been set");
+		}
+
+		if (firstSchedTime == null) {
+			logger.error("No first scheduled time has been set");
+		}
+
+		if (timezone != null) {
+			nextExecTime = nextExecTime.withZoneRetainFields(DateTimeZone
+					.forID(timezone));
+		}
+
+		ReadablePeriod period = null;
+		if (recurrence != null) {
+			period = parsePeriodString(scheduleId, recurrence);
+		}
+
+		ScheduledFlow scheduledFlow = new ScheduledFlow(scheduleId, projectId, flowId, user, userSubmit, submitTime, firstSchedTime, nextExecTime, period);
+		if (scheduledFlow.updateTime()) {
+			return scheduledFlow;
+		}
+
+		logger.info("Removed " + scheduleId
+				+ " off out of scheduled. It is not recurring.");
+		return null;
+	}
+
+	private HashMap<String, Object> createJSONObject(ScheduledFlow flow) {
+		HashMap<String, Object> object = new HashMap<String, Object>();
+		object.put(SCHEDULEID, flow.getScheduleId());
+		object.put(PROJECTID, flow.getProjectId());
+		object.put(FLOWID, flow.getFlowId());
+		object.put(USER, flow.getUser());
+		object.put(USERSUBMIT, flow.getUserSubmit());
+
+		object.put(SUBMITTIME, FILE_DATEFORMAT.print(flow.getSubmitTime()));
+		object.put(FIRSTSCHEDTIME,
+				FILE_DATEFORMAT.print(flow.getFirstSchedTime()));
+
+		object.put(NEXTEXECTIME, FILE_DATEFORMAT.print(flow.getNextExecTime()));
+		object.put(TIMEZONE, flow.getNextExecTime().getZone().getID());
+		object.put(RECURRENCE, createPeriodString(flow.getPeriod()));
+		// object.put(SCHEDULESTATUS, flow.getSchedStatus());
+
+		return object;
+	}
+
+	private ReadablePeriod parsePeriodString(String scheduleId, String periodStr) {
+		ReadablePeriod period;
+		char periodUnit = periodStr.charAt(periodStr.length() - 1);
+		if (periodUnit == 'n') {
+			return null;
+		}
+
+		int periodInt = Integer.parseInt(periodStr.substring(0,
+				periodStr.length() - 1));
+		switch (periodUnit) {
+		case 'M':
+			period = Months.months(periodInt);
+			break;
+		case 'w':
+			period = Weeks.weeks(periodInt);
+			break;
+		case 'd':
+			period = Days.days(periodInt);
+			break;
+		case 'h':
+			period = Hours.hours(periodInt);
+			break;
+		case 'm':
+			period = Minutes.minutes(periodInt);
+			break;
+		case 's':
+			period = Seconds.seconds(periodInt);
+			break;
+		default:
+			throw new IllegalArgumentException("Invalid schedule period unit '"
+					+ periodUnit + "' for flow " + scheduleId);
+		}
+
+		return period;
+	}
+
+	private String createPeriodString(ReadablePeriod period) {
+		String periodStr = "n";
+
+		if (period == null) {
+			return "n";
+		}
+
+		if (period.get(DurationFieldType.months()) > 0) {
+			int months = period.get(DurationFieldType.months());
+			periodStr = months + "M";
+		} else if (period.get(DurationFieldType.weeks()) > 0) {
+			int weeks = period.get(DurationFieldType.weeks());
+			periodStr = weeks + "w";
+		} else if (period.get(DurationFieldType.days()) > 0) {
+			int days = period.get(DurationFieldType.days());
+			periodStr = days + "d";
+		} else if (period.get(DurationFieldType.hours()) > 0) {
+			int hours = period.get(DurationFieldType.hours());
+			periodStr = hours + "h";
+		} else if (period.get(DurationFieldType.minutes()) > 0) {
+			int minutes = period.get(DurationFieldType.minutes());
+			periodStr = minutes + "m";
+		} else if (period.get(DurationFieldType.seconds()) > 0) {
+			int seconds = period.get(DurationFieldType.seconds());
+			periodStr = seconds + "s";
+		}
+
+		return periodStr;
+	}
+
+//	private HashMap<String, Object> loadLegacyFile(File schedulefile) {
+//		Props schedule = null;
+//		try {
+//			schedule = new Props(null, schedulefile.getAbsolutePath());
+//		} catch (Exception e) {
+//			throw new RuntimeException("Error loading schedule from "
+//					+ schedulefile);
+//		}
 //
-//        ArrayList<Object> flowScheduleList = new ArrayList<Object>();
-//        for(String key: schedule.getKeySet()) {
-//        	HashMap<String,Object> scheduledMap = parseScheduledFlow(key, schedule.get(key));
-//        	if (scheduledMap == null) {
-//        		flowScheduleList.add(scheduledMap);
-//        	}
-//        }
+//		ArrayList<Object> flowScheduleList = new ArrayList<Object>();
+//		for (String key : schedule.getKeySet()) {
+//			HashMap<String, Object> scheduledMap = parseScheduledFlow(key,
+//					schedule.get(key));
+//			if (scheduledMap == null) {
+//				flowScheduleList.add(scheduledMap);
+//			}
+//		}
+//
+//		HashMap<String, Object> scheduleMap = new HashMap<String, Object>();
+//		scheduleMap.put(SCHEDULE, flowScheduleList);
+//
+//		return scheduleMap;
+//	}
+
+//	private HashMap<String, Object> parseScheduledFlow(String name, String flow) {
+//		String[] pieces = flow.split("\\s+");
 //
-//        HashMap<String,Object> scheduleMap = new HashMap<String,Object>();
-//        scheduleMap.put(SCHEDULE, flowScheduleList );
-//        
-//        return scheduleMap;
-//    }
-    
-//    private HashMap<String,Object> parseScheduledFlow(String name, String flow) {
-//        String[] pieces = flow.split("\\s+");
+//		if (pieces.length != 3) {
+//			logger.warn("Error loading schedule from file " + name);
+//			return null;
+//		}
 //
-//        if(pieces.length != 3) {
-//            logger.warn("Error loading schedule from file " + name);
-//            return null;
-//        }
+//		HashMap<String, Object> scheduledFlow = new HashMap<String, Object>();
+//		scheduledFlow.put(PROJECTID, name);
+//		scheduledFlow.put(TIME, pieces[0]);
+//		scheduledFlow.put(RECURRENCE, pieces[1]);
+//		Boolean dependency = Boolean.parseBoolean(pieces[2]);
 //
-//        HashMap<String,Object> scheduledFlow = new HashMap<String,Object>();
-//        scheduledFlow.put(PROJECTID, name);
-//        scheduledFlow.put(TIME, pieces[0]);
-//        scheduledFlow.put(RECURRENCE, pieces[1]);
-//        Boolean dependency = Boolean.parseBoolean(pieces[2]);
-//        
-//        return scheduledFlow;
-//    }
+//		return scheduledFlow;
+//	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/scheduler/ScheduledFlow.java b/src/java/azkaban/scheduler/ScheduledFlow.java
index 0b1f486..1e92af2 100644
--- a/src/java/azkaban/scheduler/ScheduledFlow.java
+++ b/src/java/azkaban/scheduler/ScheduledFlow.java
@@ -19,10 +19,8 @@ package azkaban.scheduler;
 import org.joda.time.DateTime;
 import org.joda.time.ReadablePeriod;
 
-import azkaban.user.User;
 import azkaban.utils.Utils;
 
-
 /**
  * Schedule for a job instance. This is decoupled from the execution.
  * 
@@ -31,196 +29,207 @@ import azkaban.utils.Utils;
  */
 public class ScheduledFlow {
 
-	//use projectId.flowId to form a unique scheduleId
-	private final String scheduleId;	
-    
-    private final ReadablePeriod period;
-    private DateTime nextExecTime;
-    private final String user;
-    private final String userSubmit;
-    private final DateTime submitTime;
-    private final DateTime firstSchedTime;
-//    private SchedStatus schedStatus;
-    
-    public enum SchedStatus {
-    	LASTSUCCESS ("lastsuccess"), 
-    	LASTFAILED ("lastfailed"), 
-    	LASTPAUSED ("lastpaused");
-    	
-    	private final String status;
-    	SchedStatus(String status){
-    		this.status = status;
-    	}
-    	private String status(){
-    		return this.status;
-    	}
-    }
-
-    /**
-     * Constructor 
-     * 
-     * @param jobName Unique job name
-     * @param nextExecution The next execution time
-     * @param ignoreDependency 
-     */
-    public ScheduledFlow(String scheduleId,
-    					String user,
-    					String userSubmit,
-    					DateTime submitTime,
-    					DateTime firstSchedTime,
-                        DateTime nextExecution) {
-        this(scheduleId, user, userSubmit, submitTime, firstSchedTime, nextExecution, null);
-    }
-
-    /**
-     * Constructor
-     * 
-     * @param jobId
-     * @param nextExecution
-     * @param period
-     * @param ignoreDependency
-     */
-    public ScheduledFlow(String scheduleId,
-    					String user,
-    					String userSubmit,
-    					DateTime submitTime,
-    					DateTime firstSchedTime,
-                        DateTime nextExecution,
-                        ReadablePeriod period) {
-        super();
-        this.scheduleId = Utils.nonNull(scheduleId);
-        this.user = user;
-        this.userSubmit = userSubmit;
-        this.submitTime = submitTime;
-        this.firstSchedTime = firstSchedTime;
-        this.period = period;
-        this.nextExecTime = Utils.nonNull(nextExecution);
-//        this.schedStatus = SchedStatus.LASTSUCCESS;
-    }
-    
-    public ScheduledFlow(String scheduleId, 
-    		String user, 
-    		String userSubmit,
+	// use projectId.flowId to form a unique scheduleId
+	private final String scheduleId;
+	private final String flowId;
+	private final String projectId;
+
+	private final ReadablePeriod period;
+	private DateTime nextExecTime;
+	private final String user;
+	private final String userSubmit;
+	private final DateTime submitTime;
+	private final DateTime firstSchedTime;
+
+	// private SchedStatus schedStatus;
+
+	public enum SchedStatus {
+		LASTSUCCESS("lastsuccess"), LASTFAILED("lastfailed"), LASTPAUSED(
+				"lastpaused");
+
+		private final String status;
+
+		SchedStatus(String status) {
+			this.status = status;
+		}
+
+		private String status() {
+			return this.status;
+		}
+	}
+
+	/**
+	 * Constructor
+	 * 
+	 * @param jobId
+	 * @param nextExecution
+	 * @param period
+	 * @param ignoreDependency
+	 */
+	public ScheduledFlow(
+			String scheduleId, 
+			String projectId, 
+			String flowId, 
+			String user, 
+			String userSubmit,
 			DateTime submitTime, 
 			DateTime firstSchedTime,
-			ReadablePeriod period) {
-    	super();
-        this.scheduleId = Utils.nonNull(scheduleId);
-        this.user = user;
-        this.userSubmit = userSubmit;
-        this.submitTime = submitTime;
-        this.firstSchedTime = firstSchedTime;
-        this.period = period;
-        this.nextExecTime = new DateTime(firstSchedTime);
-//        this.schedStatus = SchedStatus.LASTSUCCESS;
+			DateTime nextExecution, 
+			ReadablePeriod period) 
+	{
+		super();
+		this.scheduleId = Utils.nonNull(scheduleId);
+		this.projectId = Utils.nonNull(projectId);
+		this.flowId = Utils.nonNull(flowId);
+		this.user = user;
+		this.userSubmit = userSubmit;
+		this.submitTime = submitTime;
+		this.firstSchedTime = firstSchedTime;
+		this.period = period;
+		this.nextExecTime = Utils.nonNull(nextExecution);
+		// this.schedStatus = SchedStatus.LASTSUCCESS;
 	}
 
-    public ScheduledFlow(String scheduleId, 
-    		String user, 
-    		String userSubmit,
+	public ScheduledFlow(
+			String scheduleId, 
+			String projectId, 
+			String flowId, 
+			String user, 
+			String userSubmit,
+			DateTime submitTime, 
+			DateTime firstSchedTime,
+			ReadablePeriod period)
+	{
+		this(scheduleId, projectId, flowId, user, userSubmit, submitTime, firstSchedTime, new DateTime(), period);
+	}
+
+	public ScheduledFlow(
+			String scheduleId, 
+			String projectId, 
+			String flowId, 
+			String user, 
+			String userSubmit,
 			DateTime submitTime, 
 			DateTime firstSchedTime) {
-    	super();
-        this.scheduleId = Utils.nonNull(scheduleId);
-        this.user = user;
-        this.userSubmit = userSubmit;
-        this.submitTime = submitTime;
-        this.firstSchedTime = firstSchedTime;
-        this.period = null;
-        this.nextExecTime = new DateTime();
-//        this.schedStatus = SchedStatus.LASTSUCCESS;
+		this(scheduleId, projectId, flowId, user, userSubmit, submitTime, firstSchedTime, new DateTime(), null);
 	}
 
-//	public SchedStatus getSchedStatus() {
-//		return this.schedStatus;
-//	}
-//
-//	public void setSchedStatus(SchedStatus schedStatus) {
-//		this.schedStatus = schedStatus;
-//	}
+	/**
+	 * Constructor
+	 * 
+	 * @param jobName
+	 *            Unique job name
+	 * @param nextExecution
+	 *            The next execution time
+	 * @param ignoreDependency
+	 */
+	public ScheduledFlow(String scheduleId, 
+						String projectId,
+						String flowId,
+						String user, 
+						String userSubmit,
+						DateTime submitTime, 
+						DateTime firstSchedTime, 
+						DateTime nextExecution)
+	{
+		this(scheduleId, projectId, flowId, user, userSubmit, submitTime, firstSchedTime, nextExecution, null);
+	}
+	
+	// public SchedStatus getSchedStatus() {
+	// return this.schedStatus;
+	// }
+	//
+	// public void setSchedStatus(SchedStatus schedStatus) {
+	// this.schedStatus = schedStatus;
+	// }
 
 	/**
-     * Updates the time to a future time after 'now' that matches the period description.
-     * 
-     * @return
-     */
-    public boolean updateTime() {
-    	if (nextExecTime.isAfterNow()) {
-    		return true;
-    	}
-    	
-        if (period != null) {
-        	DateTime other = getNextRuntime(nextExecTime, period);
-    		
-    		this.nextExecTime = other;
-    		return true;
-    	}
-
-        return false;
-    }
-    
-    /**
-     * Calculates the next runtime by adding the period.
-     * 
-     * @param scheduledDate
-     * @param period
-     * @return
-     */
-    private DateTime getNextRuntime(DateTime scheduledDate, ReadablePeriod period)
-    {
-        DateTime now = new DateTime();
-        DateTime date = new DateTime(scheduledDate);
-        int count = 0;
-        while (!now.isBefore(date)) {
-            if (count > 100000) {
-                throw new IllegalStateException("100000 increments of period did not get to present time.");
-            }
-
-            if (period == null) {
-                break;
-            }
-            else {
-                date = date.plus(period);
-            }
-
-            count += 1;
-        }
-
-        return date;
-    }
-    
-    /**
-     * Returns the unique id of the job to be run.
-     * @return
-     */
-
-    /**
-     * Returns true if the job recurrs in the future
-     * @return
-     */
-    public boolean isRecurring() {
-        return this.period != null;
-    }
-
-    /**
-     * Returns the recurrance period. Or null if not applicable
-     * @return
-     */
-    public ReadablePeriod getPeriod() {
-        return period;
-    }
+	 * Updates the time to a future time after 'now' that matches the period
+	 * description.
+	 * 
+	 * @return
+	 */
+	public boolean updateTime() {
+		if (nextExecTime.isAfterNow()) {
+			return true;
+		}
+
+		if (period != null) {
+			DateTime other = getNextRuntime(nextExecTime, period);
+
+			this.nextExecTime = other;
+			return true;
+		}
+
+		return false;
+	}
+
+	/**
+	 * Calculates the next runtime by adding the period.
+	 * 
+	 * @param scheduledDate
+	 * @param period
+	 * @return
+	 */
+	private DateTime getNextRuntime(DateTime scheduledDate,
+			ReadablePeriod period) {
+		DateTime now = new DateTime();
+		DateTime date = new DateTime(scheduledDate);
+		int count = 0;
+		while (!now.isBefore(date)) {
+			if (count > 100000) {
+				throw new IllegalStateException(
+						"100000 increments of period did not get to present time.");
+			}
+
+			if (period == null) {
+				break;
+			} else {
+				date = date.plus(period);
+			}
+
+			count += 1;
+		}
+
+		return date;
+	}
+
+	/**
+	 * Returns the unique id of the job to be run.
+	 * 
+	 * @return
+	 */
+
+	/**
+	 * Returns true if the job recurrs in the future
+	 * 
+	 * @return
+	 */
+	public boolean isRecurring() {
+		return this.period != null;
+	}
+
+	/**
+	 * Returns the recurrance period. Or null if not applicable
+	 * 
+	 * @return
+	 */
+	public ReadablePeriod getPeriod() {
+		return period;
+	}
 
 	public DateTime getFirstSchedTime() {
 		return firstSchedTime;
 	}
 
 	/**
-     * Returns the next scheduled execution
-     * @return
-     */
-    public DateTime getNextExecTime() {
-        return nextExecTime;
-    }
+	 * Returns the next scheduled execution
+	 * 
+	 * @return
+	 */
+	public DateTime getNextExecTime() {
+		return nextExecTime;
+	}
 
 	public String getUserSubmit() {
 		return userSubmit;
@@ -231,33 +240,29 @@ public class ScheduledFlow {
 	}
 
 	@Override
-    public String toString()
-    {
-        return "ScheduledFlow{" +
-//        	   "scheduleStatus=" + schedStatus +
-               "nextExecTime=" + nextExecTime +
-               ", period=" + period.toString() +
-               ", firstSchedTime=" + firstSchedTime +
-               ", submitTime=" + submitTime +
-               ", userSubmit=" + userSubmit +
-               ", user=" + user +
-               ", scheduleId='" + scheduleId + '\'' +
-               '}';
-    }
+	public String toString() {
+		return "ScheduledFlow{"
+				+
+				// "scheduleStatus=" + schedStatus +
+				"nextExecTime=" + nextExecTime + ", period=" + period
+				+ ", firstSchedTime=" + firstSchedTime + ", submitTime="
+				+ submitTime + ", userSubmit=" + userSubmit + ", user=" + user
+				+ ", scheduleId='" + scheduleId + '\'' + '}';
+	}
 
 	public String getUser() {
 		return user;
 	}
-	
+
 	public String getScheduleId() {
 		return scheduleId;
 	}
-	
-	public String getFlowId(){
-		return this.scheduleId.split("\\.")[1];
+
+	public String getFlowId() {
+		return flowId;
 	}
-	
-	public String getProjectId(){
-		return this.scheduleId.split("\\.")[0];
+
+	public String getProjectId() {
+		return projectId;
 	}
 }
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 12c9184..06d8ad4 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -31,380 +31,435 @@ import azkaban.user.Permission.Type;
 import azkaban.user.User;
 import azkaban.utils.Props;
 
-
-
 /**
- * The ScheduleManager stores and executes the schedule. It uses a single thread instead
- * and waits until correct loading time for the flow. It will not remove the flow from the
- * schedule when it is run, which can potentially allow the flow to and overlap each other.
- * 
- * @author Richard
+ * The ScheduleManager stores and executes the schedule. It uses a single thread
+ * instead and waits until correct loading time for the flow. It will not remove
+ * the flow from the schedule when it is run, which can potentially allow the
+ * flow to and overlap each other.
  */
 public class ScheduleManager {
 	private static Logger logger = Logger.getLogger(ScheduleManager.class);
 
-    private final DateTimeFormatter _dateFormat = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss:SSS");
+	private final DateTimeFormatter _dateFormat = DateTimeFormat
+			.forPattern("MM-dd-yyyy HH:mm:ss:SSS");
 	private ScheduleLoader loader;
-    private Map<String, ScheduledFlow> scheduleIDMap = new LinkedHashMap<String, ScheduledFlow>(); 
-    private final ScheduleRunner runner;
-    private final ExecutorManager executorManager;
-    private final ProjectManager projectManager;
-    
-    /**
-     * Give the schedule manager a loader class that will properly load the schedule.
-     * 
-     * @param loader
-     */
-    public ScheduleManager(
-    		ExecutorManager executorManager,
-    		ProjectManager projectManager,
-    		ScheduleLoader loader) 
-    {
-    	this.executorManager = executorManager;
-    	this.projectManager = projectManager;
-    	this.loader = loader;
-    	this.runner = new ScheduleRunner();
-    	
-    	List<ScheduledFlow> scheduleList = loader.loadSchedule();
-    	for (ScheduledFlow flow: scheduleList) {
-    		internalSchedule(flow);
-    	}
-    	
-    	this.runner.start();
-    }
-    
-    /**
-     * Shutdowns the scheduler thread. After shutdown, it may not be safe to use it again.
-     */
-    public void shutdown() {
-    	this.runner.shutdown();
-    }
-    
-    /**
-     * Retrieves a copy of the list of schedules.
-     * 
-     * @return
-     */
-    public synchronized List<ScheduledFlow> getSchedule() {
-    	return runner.getSchedule();
-    }
-
-    /**
-     * Returns the scheduled flow for the flow name
-     * 
-     * @param id
-     * @return
-     */
-    public ScheduledFlow getSchedule(String scheduleId) {
-    	return scheduleIDMap.get(scheduleId);
-    }
-    
-    /**
-     * Removes the flow from the schedule if it exists.
-     * 
-     * @param id
-     */
-    public synchronized void removeScheduledFlow(String scheduleId) {
-    	ScheduledFlow flow = scheduleIDMap.get(scheduleId);
-    	scheduleIDMap.remove(scheduleId);
-    	runner.removeScheduledFlow(flow);
-    	
-    	loader.saveSchedule(getSchedule());
-    }
-    
-//    public synchronized void pauseScheduledFlow(String scheduleId){
-//    	try{
-//        	ScheduledFlow flow = scheduleIDMap.get(scheduleId);
-//        	flow.setSchedStatus(SchedStatus.LASTPAUSED);
-//        	loader.saveSchedule(getSchedule());
-//    	}
-//    	catch (Exception e) {
-//    		throw new RuntimeException("Error pausing a schedule " + scheduleId);
-//		}
-//    }
-//    
-//    public synchronized void resumeScheduledFlow(String scheduleId){
-//    	try {
-//    		ScheduledFlow flow = scheduleIDMap.get(scheduleId);
-//        	flow.setSchedStatus(SchedStatus.LASTSUCCESS);
-//        	loader.saveSchedule(getSchedule());			
-//		} 
-//    	catch (Exception e) {
-//			throw new RuntimeException("Error resuming a schedule " + scheduleId);
-//		}
-//    }
-    
-    public void schedule(final String scheduleId,
-    		final String user,
-    		final String userSubmit,
-    		final DateTime submitTime,
-    		final DateTime firstSchedTime,
-    		final ReadablePeriod period
-            ) {
-    	//TODO: should validate projectId and flowId?
-		logger.info("Scheduling flow '" + scheduleId + "' for " + _dateFormat.print(firstSchedTime)
-		+ " with a period of " + PeriodFormat.getDefault().print(period));
-		schedule(new ScheduledFlow(scheduleId, user, userSubmit, submitTime, firstSchedTime, period));
+	private Map<String, ScheduledFlow> scheduleIDMap = new LinkedHashMap<String, ScheduledFlow>();
+	private final ScheduleRunner runner;
+	private final ExecutorManager executorManager;
+	private final ProjectManager projectManager;
+
+	/**
+	 * Give the schedule manager a loader class that will properly load the
+	 * schedule.
+	 * 
+	 * @param loader
+	 */
+	public ScheduleManager(ExecutorManager executorManager,
+							ProjectManager projectManager, 
+							ScheduleLoader loader) 
+	{
+		this.executorManager = executorManager;
+		this.projectManager = projectManager;
+		this.loader = loader;
+		this.runner = new ScheduleRunner();
+
+		List<ScheduledFlow> scheduleList = loader.loadSchedule();
+		for (ScheduledFlow flow : scheduleList) {
+			internalSchedule(flow);
+		}
+
+		this.runner.start();
+	}
+
+	/**
+	 * Shutdowns the scheduler thread. After shutdown, it may not be safe to use
+	 * it again.
+	 */
+	public void shutdown() {
+		this.runner.shutdown();
 	}
-    
-    /**
-     * Schedule the flow
-     * @param flowId
-     * @param date
-     * @param ignoreDep
-     */
-    public void schedule(String scheduleId, String user, String userSubmit, DateTime submitTime, DateTime firstSchedTime) {
-        logger.info("Scheduling flow '" + scheduleId + "' for " + _dateFormat.print(firstSchedTime));
-        schedule(new ScheduledFlow(scheduleId, user, userSubmit, submitTime, firstSchedTime));
-    }
-    
-    /**
-     * Schedules the flow, but doesn't save the schedule afterwards.
-     * @param flow
-     */
-    private synchronized void internalSchedule(ScheduledFlow flow) {
-    	ScheduledFlow existing = scheduleIDMap.get(flow.getScheduleId());
-    	flow.updateTime();
-    	if (existing != null) {
-    		this.runner.removeScheduledFlow(existing);
-    	}
-    	
+
+	/**
+	 * Retrieves a copy of the list of schedules.
+	 * 
+	 * @return
+	 */
+	public synchronized List<ScheduledFlow> getSchedule() {
+		return runner.getSchedule();
+	}
+
+	/**
+	 * Returns the scheduled flow for the flow name
+	 * 
+	 * @param id
+	 * @return
+	 */
+	public ScheduledFlow getSchedule(String scheduleId) {
+		return scheduleIDMap.get(scheduleId);
+	}
+
+	/**
+	 * Removes the flow from the schedule if it exists.
+	 * 
+	 * @param id
+	 */
+	public synchronized void removeScheduledFlow(String scheduleId) {
+		ScheduledFlow flow = scheduleIDMap.get(scheduleId);
+		scheduleIDMap.remove(scheduleId);
+		runner.removeScheduledFlow(flow);
+
+		loader.saveSchedule(getSchedule());
+	}
+
+	// public synchronized void pauseScheduledFlow(String scheduleId){
+	// try{
+	// ScheduledFlow flow = scheduleIDMap.get(scheduleId);
+	// flow.setSchedStatus(SchedStatus.LASTPAUSED);
+	// loader.saveSchedule(getSchedule());
+	// }
+	// catch (Exception e) {
+	// throw new RuntimeException("Error pausing a schedule " + scheduleId);
+	// }
+	// }
+	//
+	// public synchronized void resumeScheduledFlow(String scheduleId){
+	// try {
+	// ScheduledFlow flow = scheduleIDMap.get(scheduleId);
+	// flow.setSchedStatus(SchedStatus.LASTSUCCESS);
+	// loader.saveSchedule(getSchedule());
+	// }
+	// catch (Exception e) {
+	// throw new RuntimeException("Error resuming a schedule " + scheduleId);
+	// }
+	// }
+
+	public void schedule(
+			final String scheduleId, 
+			final String projectId,
+			final String flowId, 
+			final String user, 
+			final String userSubmit,
+			final DateTime submitTime, 
+			final DateTime firstSchedTime,
+			final ReadablePeriod period) {
+		logger.info("Scheduling flow '" + scheduleId + "' for "
+				+ _dateFormat.print(firstSchedTime) + " with a period of "
+				+ PeriodFormat.getDefault().print(period));
+		
+		schedule(new ScheduledFlow(scheduleId, projectId, flowId, user,
+				userSubmit, submitTime, firstSchedTime, period));
+	}
+
+	/**
+	 * Schedule the flow
+	 * 
+	 * @param flowId
+	 * @param date
+	 * @param ignoreDep
+	 */
+	public void schedule(
+			String scheduleId,
+			String projectId,
+			String flowId,
+			String user, 
+			String userSubmit,
+			DateTime submitTime,
+			DateTime firstSchedTime) 
+	{
+		logger.info("Scheduling flow '" + scheduleId + "' for "
+				+ _dateFormat.print(firstSchedTime));
+		schedule(new ScheduledFlow(scheduleId, projectId, flowId, user, userSubmit, submitTime,
+				firstSchedTime));
+	}
+
+	/**
+	 * Schedules the flow, but doesn't save the schedule afterwards.
+	 * 
+	 * @param flow
+	 */
+	private synchronized void internalSchedule(ScheduledFlow flow) {
+		ScheduledFlow existing = scheduleIDMap.get(flow.getScheduleId());
+		flow.updateTime();
+		if (existing != null) {
+			this.runner.removeScheduledFlow(existing);
+		}
+
 		this.runner.addScheduledFlow(flow);
-    	scheduleIDMap.put(flow.getScheduleId(), flow);
-    }
-    
-    /**
-     * Adds a flow to the schedule.
-     * 
-     * @param flow
-     */
-    public synchronized void schedule(ScheduledFlow flow) {
-    	internalSchedule(flow);
-    	saveSchedule();
-    }
-    
-    /**
-     * Save the schedule
-     */
-    private void saveSchedule() {
-    	loader.saveSchedule(getSchedule());
-    }
-    
-    
-    /**
-     * Thread that simply invokes the running of flows when the schedule is
-     * ready.
-     * 
-     * @author Richard Park
-     *
-     */
-    public class ScheduleRunner extends Thread {
-    	private final PriorityBlockingQueue<ScheduledFlow> schedule;
-    	private AtomicBoolean stillAlive = new AtomicBoolean(true);
-
-        	// Five minute minimum intervals
-    	private static final int TIMEOUT_MS = 300000;
-    	
-    	public ScheduleRunner() {
-    		schedule = new PriorityBlockingQueue<ScheduledFlow>(1, new ScheduleComparator());
-    	}
-    	
-    	public void shutdown() {
-    		logger.error("Shutting down scheduler thread");
-    		stillAlive.set(false);
-    		this.interrupt();
-    	}
-    	
-    	/**
-    	 * Return a list of scheduled flow
-    	 * @return
-    	 */
-    	public synchronized List<ScheduledFlow> getSchedule() {
-    		return new ArrayList<ScheduledFlow>(schedule);
-    	}
-    	
-    	/**
-    	 * Adds the flow to the schedule and then interrupts so it will update its wait time.
-    	 * @param flow
-    	 */
-        public synchronized void addScheduledFlow(ScheduledFlow flow) {
-        	logger.info("Adding " + flow + " to schedule.");
-        	schedule.add(flow);
-//            MonitorImpl.getInternalMonitorInterface().workflowEvent(null, 
-//                    System.currentTimeMillis(),
-//                    WorkflowAction.SCHEDULE_WORKFLOW, 
-//                    WorkflowState.NOP,
-//                    flow.getId());
-
-        	this.interrupt();
-        }
-
-        /**
-         * Remove scheduled flows. Does not interrupt.
-         * 
-         * @param flow
-         */
-        public synchronized void removeScheduledFlow(ScheduledFlow flow) {
-        	logger.info("Removing " + flow + " from the schedule.");
-        	schedule.remove(flow);
-//            MonitorImpl.getInternalMonitorInterface().workflowEvent(null, 
-//                    System.currentTimeMillis(),
-//                    WorkflowAction.UNSCHEDULE_WORKFLOW,
-//                    WorkflowState.NOP,
-//                    flow.getId());
-        	// Don't need to interrupt, because if this is originally on the top of the queue,
-        	// it'll just skip it.
-        }
-        
-        public void run() {
-        	while(stillAlive.get()) {
-        		synchronized (this) {
-        			try {
-        				//TODO clear up the exception handling
-        				ScheduledFlow schedFlow = schedule.peek();
-	    	    		
-	    	    		if (schedFlow == null) {
-	    	    			// If null, wake up every minute or so to see if there's something to do.
-	    	    			// Most likely there will not be.
-	    	    			try {
-	    	    				this.wait(TIMEOUT_MS);
-	    	    			} catch (InterruptedException e) {
-	    	    				// interruption should occur when items are added or removed from the queue.
-	    	    			}
-	    	    		}
-	    	    		else {
-	    	    			// We've passed the flow execution time, so we will run.
-	    	    			if (!schedFlow.getNextExecTime().isAfterNow()) {
-	    	    				// Run flow. The invocation of flows should be quick.
-	    	    				ScheduledFlow runningFlow = schedule.poll();
-	    	    				logger.info("Scheduler attempting to run " + runningFlow.getScheduleId());
-	
-	    	    				// Execute the flow here
-	    	    				try {
-	    	    					Project project = projectManager.getProject(runningFlow.getProjectId());
-	    	    					if (project == null) {
-	    	    						logger.error("Scheduled Project "+runningFlow.getProjectId()+" does not exist!");
-	    	    						throw new RuntimeException("Error finding the scheduled project. " + runningFlow.getScheduleId());
-	    	    					}
-	    	    					
-	    	    					Flow flow = project.getFlow(runningFlow.getFlowId());
-	    	    					if (flow == null) {
-	    	    						logger.error("Flow " + runningFlow.getFlowId() + " cannot be found in project " + project.getName());
-	    	    						throw new RuntimeException("Error finding the scheduled flow. " + runningFlow.getScheduleId());
-	    	    					}
-	    	    					
-	    	    					HashMap<String, Props> sources;
-	    	    					try {
-	    	    						sources = projectManager.getAllFlowProperties(project, runningFlow.getFlowId());
-	    	    					}
-	    	    					catch (ProjectManagerException e) {
-	    	    						logger.error(e.getMessage());
-	    	    						throw new RuntimeException("Error getting the flow resources. " + runningFlow.getScheduleId());
-	    	    					}
-	    	    				    	    					
-	    	    					// Create ExecutableFlow
-	    	    					ExecutableFlow exflow = executorManager.createExecutableFlow(flow);
-	    	    					exflow.setSubmitUser(runningFlow.getUser());
-	    	    					//TODO make disabled in scheduled flow
-//	    	    					Map<String, String> paramGroup = this.getParamGroup(req, "disabled");
-//	    	    					for (Map.Entry<String, String> entry: paramGroup.entrySet()) {
-//	    	    						boolean nodeDisabled = Boolean.parseBoolean(entry.getValue());
-//	    	    						exflow.setStatus(entry.getKey(), nodeDisabled ? Status.DISABLED : Status.READY);
-//	    	    					}
-	    	    					
-	    	    					// Create directory
-	    	    					try {
-	    	    						executorManager.setupExecutableFlow(exflow);
-	    	    					} catch (ExecutorManagerException e) {
-	    	    						try {
-	    	    							executorManager.cleanupAll(exflow);
-	    	    						} catch (ExecutorManagerException e1) {
-	    	    							e1.printStackTrace();
-	    	    						}
-	    	    						logger.error(e.getMessage());
-	    	    						return;
-	    	    					}
-
-	    	    					// Copy files to the source.
-	    	    					File executionDir = new File(exflow.getExecutionPath());
-	    	    					try {
-	    	    						projectManager.copyProjectSourceFilesToDirectory(project, executionDir);
-	    	    					} catch (ProjectManagerException e) {
-	    	    						try {
-	    	    							executorManager.cleanupAll(exflow);
-	    	    						} catch (ExecutorManagerException e1) {
-	    	    							e1.printStackTrace();
-	    	    						}
-	    	    						logger.error(e.getMessage());
-	    	    						return;
-	    	    					}
-	    	    					
-
-	    	    					try {
-	    	    						executorManager.executeFlow(exflow);
-	    	    					} catch (ExecutorManagerException e) {
-	    	    						try {
-	    	    							executorManager.cleanupAll(exflow);
-	    	    						} catch (ExecutorManagerException e1) {
-	    	    							e1.printStackTrace();
-	    	    						}
-	    	    						
-	    	    						logger.error(e.getMessage());
-	    	    						return;
-	    	    					}
-	    	    				} catch (JobExecutionException e) {
-	    	    					logger.info("Could not run flow. " + e.getMessage());
-	    	    				}
-	    	    	        	schedule.remove(runningFlow);
-	    	    				
-	    	    				// Immediately reschedule if it's possible. Let the execution manager
-	    	    				// handle any duplicate runs.
-	    	    				if (runningFlow.updateTime()) {
-	    	    					schedule.add(runningFlow);
-	    	    				}
-    	    					saveSchedule();
-	    	    			}
-	    	    			else {
-	    	    				// wait until flow run
-	    	    				long millisWait = Math.max(0, schedFlow.getNextExecTime().getMillis() - (new DateTime()).getMillis());
-	    	    				try {
-	    							this.wait(Math.min(millisWait, TIMEOUT_MS));
-	    						} catch (InterruptedException e) {
-	    							// interruption should occur when items are added or removed from the queue.
-	    						}
-	    	    			}
-	    	    		}
-        			}
-        			catch (Exception e) {
-        				logger.error("Unexpected exception has been thrown in scheduler", e);
-        			}
-        			catch (Throwable e) {
-        				logger.error("Unexpected throwable has been thrown in scheduler", e);
-        			}
-        		}
-        	}
-        }
-        
-        /**
-         * Class to sort the schedule based on time.
-         * 
-         * @author Richard Park
-         */
-        private class ScheduleComparator implements Comparator<ScheduledFlow>{
-    		@Override
-    		public int compare(ScheduledFlow arg0, ScheduledFlow arg1) {
-    			DateTime first = arg1.getNextExecTime();
-    			DateTime second = arg0.getNextExecTime();
-    			
-    			if (first.isEqual(second)) {
-    				return 0;
-    			}
-    			else if (first.isBefore(second)) {
-    				return 1;
-    			}
-    			
-    			return -1;
-    		}	
-        }
-    }
+		scheduleIDMap.put(flow.getScheduleId(), flow);
+	}
+
+	/**
+	 * Adds a flow to the schedule.
+	 * 
+	 * @param flow
+	 */
+	public synchronized void schedule(ScheduledFlow flow) {
+		internalSchedule(flow);
+		saveSchedule();
+	}
+
+	/**
+	 * Save the schedule
+	 */
+	private void saveSchedule() {
+		loader.saveSchedule(getSchedule());
+	}
+
+	/**
+	 * Thread that simply invokes the running of flows when the schedule is
+	 * ready.
+	 * 
+	 * @author Richard Park
+	 * 
+	 */
+	public class ScheduleRunner extends Thread {
+		private final PriorityBlockingQueue<ScheduledFlow> schedule;
+		private AtomicBoolean stillAlive = new AtomicBoolean(true);
+
+		// Five minute minimum intervals
+		private static final int TIMEOUT_MS = 300000;
+
+		public ScheduleRunner() {
+			schedule = new PriorityBlockingQueue<ScheduledFlow>(1,
+					new ScheduleComparator());
+		}
+
+		public void shutdown() {
+			logger.error("Shutting down scheduler thread");
+			stillAlive.set(false);
+			this.interrupt();
+		}
+
+		/**
+		 * Return a list of scheduled flow
+		 * 
+		 * @return
+		 */
+		public synchronized List<ScheduledFlow> getSchedule() {
+			return new ArrayList<ScheduledFlow>(schedule);
+		}
+
+		/**
+		 * Adds the flow to the schedule and then interrupts so it will update
+		 * its wait time.
+		 * 
+		 * @param flow
+		 */
+		public synchronized void addScheduledFlow(ScheduledFlow flow) {
+			logger.info("Adding " + flow + " to schedule.");
+			schedule.add(flow);
+			// MonitorImpl.getInternalMonitorInterface().workflowEvent(null,
+			// System.currentTimeMillis(),
+			// WorkflowAction.SCHEDULE_WORKFLOW,
+			// WorkflowState.NOP,
+			// flow.getId());
+
+			this.interrupt();
+		}
+
+		/**
+		 * Remove scheduled flows. Does not interrupt.
+		 * 
+		 * @param flow
+		 */
+		public synchronized void removeScheduledFlow(ScheduledFlow flow) {
+			logger.info("Removing " + flow + " from the schedule.");
+			schedule.remove(flow);
+			// MonitorImpl.getInternalMonitorInterface().workflowEvent(null,
+			// System.currentTimeMillis(),
+			// WorkflowAction.UNSCHEDULE_WORKFLOW,
+			// WorkflowState.NOP,
+			// flow.getId());
+			// Don't need to interrupt, because if this is originally on the top
+			// of the queue,
+			// it'll just skip it.
+		}
+
+		public void run() {
+			while (stillAlive.get()) {
+				synchronized (this) {
+					try {
+						// TODO clear up the exception handling
+						ScheduledFlow schedFlow = schedule.peek();
+
+						if (schedFlow == null) {
+							// If null, wake up every minute or so to see if
+							// there's something to do.
+							// Most likely there will not be.
+							try {
+								this.wait(TIMEOUT_MS);
+							} catch (InterruptedException e) {
+								// interruption should occur when items are
+								// added or removed from the queue.
+							}
+						} else {
+							// We've passed the flow execution time, so we will
+							// run.
+							if (!schedFlow.getNextExecTime().isAfterNow()) {
+								// Run flow. The invocation of flows should be
+								// quick.
+								ScheduledFlow runningFlow = schedule.poll();
+								logger.info("Scheduler attempting to run "
+										+ runningFlow.getScheduleId());
+
+								// Execute the flow here
+								try {
+									Project project = projectManager
+											.getProject(runningFlow
+													.getProjectId());
+									if (project == null) {
+										logger.error("Scheduled Project "
+												+ runningFlow.getProjectId()
+												+ " does not exist!");
+										throw new RuntimeException(
+												"Error finding the scheduled project. "
+														+ runningFlow
+																.getScheduleId());
+									}
+
+									Flow flow = project.getFlow(runningFlow
+											.getFlowId());
+									if (flow == null) {
+										logger.error("Flow "
+												+ runningFlow.getFlowId()
+												+ " cannot be found in project "
+												+ project.getName());
+										throw new RuntimeException(
+												"Error finding the scheduled flow. "
+														+ runningFlow
+																.getScheduleId());
+									}
+
+									HashMap<String, Props> sources;
+									try {
+										sources = projectManager
+												.getAllFlowProperties(project,
+														runningFlow.getFlowId());
+									} catch (ProjectManagerException e) {
+										logger.error(e.getMessage());
+										throw new RuntimeException(
+												"Error getting the flow resources. "
+														+ runningFlow
+																.getScheduleId());
+									}
+
+									// Create ExecutableFlow
+									ExecutableFlow exflow = executorManager
+											.createExecutableFlow(flow);
+									exflow.setSubmitUser(runningFlow.getUser());
+									// TODO make disabled in scheduled flow
+									// Map<String, String> paramGroup =
+									// this.getParamGroup(req, "disabled");
+									// for (Map.Entry<String, String> entry:
+									// paramGroup.entrySet()) {
+									// boolean nodeDisabled =
+									// Boolean.parseBoolean(entry.getValue());
+									// exflow.setStatus(entry.getKey(),
+									// nodeDisabled ? Status.DISABLED :
+									// Status.READY);
+									// }
+
+									// Create directory
+									try {
+										executorManager
+												.setupExecutableFlow(exflow);
+									} catch (ExecutorManagerException e) {
+										try {
+											executorManager.cleanupAll(exflow);
+										} catch (ExecutorManagerException e1) {
+											e1.printStackTrace();
+										}
+										logger.error(e.getMessage());
+										return;
+									}
+
+									// Copy files to the source.
+									File executionDir = new File(
+											exflow.getExecutionPath());
+									try {
+										projectManager
+												.copyProjectSourceFilesToDirectory(
+														project, executionDir);
+									} catch (ProjectManagerException e) {
+										try {
+											executorManager.cleanupAll(exflow);
+										} catch (ExecutorManagerException e1) {
+											e1.printStackTrace();
+										}
+										logger.error(e.getMessage());
+										return;
+									}
+
+									try {
+										executorManager.executeFlow(exflow);
+									} catch (ExecutorManagerException e) {
+										try {
+											executorManager.cleanupAll(exflow);
+										} catch (ExecutorManagerException e1) {
+											e1.printStackTrace();
+										}
+
+										logger.error(e.getMessage());
+										return;
+									}
+								} catch (JobExecutionException e) {
+									logger.info("Could not run flow. "
+											+ e.getMessage());
+								}
+								schedule.remove(runningFlow);
+
+								// Immediately reschedule if it's possible. Let
+								// the execution manager
+								// handle any duplicate runs.
+								if (runningFlow.updateTime()) {
+									schedule.add(runningFlow);
+								}
+								saveSchedule();
+							} else {
+								// wait until flow run
+								long millisWait = Math.max(0, schedFlow
+										.getNextExecTime().getMillis()
+										- (new DateTime()).getMillis());
+								try {
+									this.wait(Math.min(millisWait, TIMEOUT_MS));
+								} catch (InterruptedException e) {
+									// interruption should occur when items are
+									// added or removed from the queue.
+								}
+							}
+						}
+					} catch (Exception e) {
+						logger.error(
+								"Unexpected exception has been thrown in scheduler",
+								e);
+					} catch (Throwable e) {
+						logger.error(
+								"Unexpected throwable has been thrown in scheduler",
+								e);
+					}
+				}
+			}
+		}
+
+		/**
+		 * Class to sort the schedule based on time.
+		 * 
+		 * @author Richard Park
+		 */
+		private class ScheduleComparator implements Comparator<ScheduledFlow> {
+			@Override
+			public int compare(ScheduledFlow arg0, ScheduledFlow arg1) {
+				DateTime first = arg1.getNextExecTime();
+				DateTime second = arg0.getNextExecTime();
+
+				if (first.isEqual(second)) {
+					return 0;
+				} else if (first.isBefore(second)) {
+					return 1;
+				}
+
+				return -1;
+			}
+		}
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 83d394a..f3fb647 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -442,7 +442,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 			ret.put("error", e.getMessage());
 			return;
 		}
-		
 
 		try {
 			executorManager.executeFlow(exflow);
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 648333b..6d54af0 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -37,6 +37,8 @@ import azkaban.flow.Node;
 import azkaban.project.Project;
 import azkaban.project.ProjectManager;
 import azkaban.project.ProjectManagerException;
+import azkaban.scheduler.ScheduleManager;
+import azkaban.scheduler.ScheduledFlow;
 import azkaban.user.Permission;
 import azkaban.user.UserManager;
 import azkaban.user.Permission.Type;
@@ -55,6 +57,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 	
 	private ProjectManager projectManager;
 	private ExecutorManager executorManager;
+	private ScheduleManager scheduleManager;
 	private MultipartParser multipartParser;
 	private File tempDir;
 	private static Comparator<Flow> FLOW_ID_COMPARATOR = new Comparator<Flow>() {
@@ -69,6 +72,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 		super.init(config);
 		projectManager = this.getApplication().getProjectManager();
 		executorManager = this.getApplication().getExecutorManager();
+		scheduleManager = this.getApplication().getScheduleManager();
 		
 		tempDir = this.getApplication().getTempDirectory();
 		multipartParser = new MultipartParser(DEFAULT_UPLOAD_DISK_SPOOL_SIZE);
@@ -89,6 +93,9 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 			else if (hasParam(req, "flow")) {
 				handleFlowPage(req, resp, session);
 			}
+			else if (hasParam(req, "delete")) {
+				handleRemoveProject(req, resp, session);
+			}
 			else {
 				handleProjectPage(req, resp, session);
 			}
@@ -220,6 +227,63 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 		ret.put("executions", history);
 	}
 	
+	private void handleRemoveProject(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+		User user = session.getUser();
+		String projectName = getParam(req, "project");
+		
+		Project project = projectManager.getProject(projectName);
+		if (project == null) {
+			this.setErrorMessageInCookie(resp, "Project " + projectName + " doesn't exist.");
+			resp.sendRedirect(req.getContextPath());
+			return;
+		}
+		
+		if (!project.hasPermission(user, Type.ADMIN)) {
+			this.setErrorMessageInCookie(resp, "Cannot delete. User '" + user.getUserId() + "' is not an ADMIN.");
+			resp.sendRedirect(req.getRequestURI() + "?project=" + projectName);
+			return;
+		}
+		
+		// Check if scheduled
+		ScheduledFlow sflow = null;
+		for (ScheduledFlow flow: scheduleManager.getSchedule()) {
+			if (flow.getProjectId().equals(projectName)) {
+				sflow = flow;
+				break;
+			}
+		}
+		if (sflow != null) {
+			this.setErrorMessageInCookie(resp, "Cannot delete. Please unschedule " + sflow.getScheduleId() + ".");
+			resp.sendRedirect(req.getRequestURI() + "?project=" + projectName);
+			return;
+		}
+
+		// Check if executing
+		ExecutableFlow exflow = null;
+		for (ExecutableFlow flow: executorManager.getRunningFlows()) {
+			if (flow.getProjectId() == projectName) {
+				exflow = flow;
+				break;
+			}
+		}
+		if (exflow != null) {
+			this.setErrorMessageInCookie(resp, "Cannot delete. Executable flow " + exflow.getExecutionId() + " is still running.");
+			resp.sendRedirect(req.getRequestURI() + "?project=" + projectName);
+			return;
+		}
+
+		try {
+			projectManager.removeProject(projectName);
+		} catch (ProjectManagerException e) {
+			this.setErrorMessageInCookie(resp, e.getMessage());
+			resp.sendRedirect(req.getRequestURI() + "?project=" + projectName);
+			return;
+		}
+		
+		this.setSuccessMessageInCookie(resp, "Project '" + projectName + "' was successfully deleted.");
+		resp.sendRedirect(req.getContextPath());
+	}
+	
 	private void ajaxChangeDescription(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
 		String description = getParam(req, "description");
 		project.setDescription(description);
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 59b931c..a05ca60 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -178,10 +178,8 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		DateTime submitTime = new DateTime();
 		DateTime firstSchedTime = day.withHourOfDay(hour).withMinuteOfHour(minutes).withSecondOfMinute(0);
 		
-		scheduleManager.schedule(scheduleId,userExec, userSubmit, submitTime, firstSchedTime, thePeriod);
+		scheduleManager.schedule(scheduleId, projectId, flowId, userExec, userSubmit, submitTime, firstSchedTime, thePeriod);
 		
-		
-
 		ret.put("status", "success");
 		ret.put("message", scheduleId + " scheduled.");
 	
diff --git a/src/java/azkaban/webapp/servlet/velocity/index.vm b/src/java/azkaban/webapp/servlet/velocity/index.vm
index 0c1555d..f7c3944 100644
--- a/src/java/azkaban/webapp/servlet/velocity/index.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/index.vm
@@ -23,6 +23,11 @@
 		<div class="messaging"><p id="messageClose">X</p><p id="message"></p></div>  
 
 		<div class="content">
+			#if($error_message != "null")
+				<div class="box-error-message">$error_message</div>
+			#elseif($success_message != "null")
+							<div class="box-success-message">$success_message</div>
+			#end
 			<div id="all-jobs-content">
 				<div class="section-hd">
 #if ($allProjects)
diff --git a/src/java/azkaban/webapp/servlet/velocity/projectpage.vm b/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
index 97fe423..cd2695f 100644
--- a/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
@@ -49,6 +49,10 @@
 						<tr><td class="first">Project Admins:</td><td>$admins</td></tr>
 						<tr><td class="first">Your Permissions:</td><td>$userpermission.toString()</td></tr>
 					</table>
+					#if($admin)
+						<div id="deleteProject" class="btn6">Delete Project</div>
+					#end
+
 				</div>
 
 				<div id="project-summary">
@@ -59,7 +63,7 @@
 						<tr><td class="first">Modified by:</td><td>$project.lastModifiedUser</td></tr>
 						<tr><td class="first">Description:</td><td id="pdescription">$project.description</td>
 							#if($admin)
-								<td><div id="edit" class="btn5">Edit</div></td>
+								<td><div id="edit" class="btn5">Edit Description</div></td>
 							#end
 						</tr>
 					</table>
@@ -131,6 +135,22 @@
 				</div>
 			</div>
 		</div>
+		<div id="delete-project" class="modal">
+			<h3>Delete Project</h3>
+			<div class="warn">
+				<div class="warning-icon"></div>
+				<div class="warning-message"><p>Warning: This project will be deleted and may not be recoverable.</p></div>
+			</div>
+			<form id="delete-form">
+				<input type="hidden" name="project" value="$project.name" />
+				<input type="hidden" name="delete" value="true" />
+			</form>
+			
+			<div class="actions">
+				<a class="no simplemodal-close btn3" href="#">Cancel</a>
+				<a class="yes btn6" id="delete-btn" href="#">Yes</a>
+			</div>
+		</div>
 	</body>
 </html>
 
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 449e05d..2679b52 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -420,10 +420,10 @@ tr:hover td {
 	background: linear-gradient(to bottom, #f2f5f6 0%,#e3eaed 37%,#c8d7dc 100%); /* W3C */
 	filter: progid:DXImageTransform.Microsoft.gradient( startColorstr='#f2f5f6', endColorstr='#c8d7dc',GradientType=0 ); /* IE6-9 */
 	  
-	  border-color: #9BA7AA;
-	  color: #61696B;
-	  display: inline-block;
-	  font-weight: bold;
+	border-color: #9BA7AA;
+	color: #61696B;
+	display: inline-block;
+	font-weight: bold;
 }
 .btn7:hover {
 	background: #fcfeff; /* Old browsers */
@@ -621,7 +621,8 @@ tr:hover td {
 }
 
 .modal .btn2,
-.modal .btn3 {
+.modal .btn3,
+.modal .btn6 {
   float: right;
   margin-left: 10px;
 }
@@ -968,6 +969,12 @@ tr:hover td {
 	width: 30%;
 }
 
+#deleteProject {
+	width: 100px;
+	text-align: center;
+	margin-top: 10px;
+}
+
 #job-summary {
 	margin-left: 30px;
 	margin-bottom: 10px;
@@ -995,7 +1002,7 @@ tr:hover td {
 	border-width: 0px;
 }
 .summary-table .first {
-	width: 100px;
+	min-width: 100px;
 	font-weight: bold;
 }
 
@@ -1379,7 +1386,7 @@ table.parameters tr td {
 }
 
 #edit {
-	width: 60px;
+	width: 90px;
 	text-align: center;
 }
 
@@ -1753,8 +1760,25 @@ ul.disableMenu {
 	margin-left: 15px;
 }
 
-/* old styles */
+.warn {
+	margin-left: 30px;
+	height: 40px;
+}
+
+.warning-icon {
+	float: left;
+	background-image: url("./images/redwarning.png");
+	width: 48px;
+	height: 43px;
+}
+
+.warning-message {
+	float: left;
+	margin-left: 10px;
+	margin-top: 10px;
+}
 
+/* old styles */
 .azkaban-charts .hitarea {
 	background-image: url("../../js/jqueryui/themes/custom-theme/images/ui-icons_cccccc_256x240.png");
 	background-position: 0 -16px;
diff --git a/src/web/css/images/redwarning.png b/src/web/css/images/redwarning.png
new file mode 100644
index 0000000..08c5fe6
Binary files /dev/null and b/src/web/css/images/redwarning.png differ
diff --git a/src/web/js/azkaban.project.view.js b/src/web/js/azkaban.project.view.js
index 0365e80..8694413 100644
--- a/src/web/js/azkaban.project.view.js
+++ b/src/web/js/azkaban.project.view.js
@@ -6,7 +6,6 @@ azkaban.ProjectView= Backbone.View.extend({
       "click #project-upload-btn":"handleUploadProjectJob"
   },
   initialize : function(settings) {
-
   },
   handleUploadProjectJob : function(evt) {
       console.log("click upload project");
@@ -43,6 +42,20 @@ azkaban.UploadProjectView= Backbone.View.extend({
   }
 });
 
+var deleteProjectView;
+azkaban.DeleteProjectView= Backbone.View.extend({
+  events : {
+    "click #delete-btn": "handleDeleteProject"
+  },
+  initialize : function(settings) {
+  },
+  handleDeleteProject : function(evt) {
+  	$("#delete-form").submit();
+  },
+  render: function() {
+  }
+});
+
 var flowTableView;
 azkaban.FlowTableView= Backbone.View.extend({
   events : {
@@ -166,7 +179,7 @@ azkaban.ProjectSummaryView= Backbone.View.extend({
       var editText = $("#edit").text();
       var descriptionTD = $('#pdescription');
       
-      if (editText != "Edit") {
+      if (editText != "Edit Description") {
           var requestURL = contextURL + "/manager";
           var newText = $("#descEdit").val();
 
@@ -184,10 +197,9 @@ azkaban.ProjectSummaryView= Backbone.View.extend({
           $(descriptionTD).remove("#descEdit");
           $(descriptionTD).text(newText);
           
-          $("#edit").text("Edit");
+          $("#edit").text("Edit Description");
       }
       else {
-      
 	      var text = $(descriptionTD).text();
 	      var edit = document.createElement("textarea");
 	      
@@ -204,12 +216,29 @@ azkaban.ProjectSummaryView= Backbone.View.extend({
   }
 });
 
-
 $(function() {
 	projectView = new azkaban.ProjectView({el:$('#all-jobs-content')});
 	uploadView = new azkaban.UploadProjectView({el:$('#upload-project')});
 	flowTableView = new azkaban.FlowTableView({el:$('#flow-tabs')});
 	projectSummary = new azkaban.ProjectSummaryView({el:$('#project-summary')});
+	deleteProjectView = new azkaban.DeleteProjectView({el: $('#delete-project')});
 	// Setting up the project tabs
 
+	$('#deleteProject').click(function() {
+		$('#delete-project').modal({
+	        closeHTML: "<a href='#' title='Close' class='modal-close'>x</a>",
+	        position: ["20%",],
+	        containerId: 'confirm-container',
+	        containerCss: {
+	          'height': '220px',
+	          'width': '565px'
+	        },
+	        onShow: function (dialog) {
+	          var modal = this;
+	          $("#errorMsg").hide();
+	        }
+	    });
+	}
+	);
+
 });
diff --git a/unit/java/azkaban/scheduler/MockLoader.java b/unit/java/azkaban/scheduler/MockLoader.java
index 0d1f5eb..867b502 100644
--- a/unit/java/azkaban/scheduler/MockLoader.java
+++ b/unit/java/azkaban/scheduler/MockLoader.java
@@ -9,8 +9,8 @@ import org.joda.time.Period;
 public class MockLoader implements ScheduleLoader {
 	private ArrayList<ScheduledFlow> scheduledFlow = new ArrayList<ScheduledFlow>();
 	
-	public void addScheduledFlow(String scheduleId, String user, String userSubmit, DateTime submitTime, DateTime firstSchedTime, DateTime nextExec, Period recurrence) {
-		ScheduledFlow flow = new ScheduledFlow(scheduleId, user, userSubmit, submitTime, firstSchedTime, nextExec, recurrence);
+	public void addScheduledFlow(String scheduleId, String projectId, String flowId, String user, String userSubmit, DateTime submitTime, DateTime firstSchedTime, DateTime nextExec, Period recurrence) {
+		ScheduledFlow flow = new ScheduledFlow(scheduleId, projectId, flowId, user, userSubmit, submitTime, firstSchedTime, nextExec, recurrence);
 		addScheduleFlow(flow);
 	}