azkaban-developers
Changes
src/java/azkaban/project/ProjectLogger.java 62(+62 -0)
src/java/azkaban/scheduler/LocalFileScheduleLoader.java 456(+230 -226)
src/java/azkaban/scheduler/ScheduledFlow.java 397(+201 -196)
src/java/azkaban/scheduler/ScheduleManager.java 791(+423 -368)
src/web/css/azkaban.css 40(+32 -8)
src/web/css/images/redwarning.png 0(+0 -0)
src/web/js/azkaban.project.view.js 39(+34 -5)
Details
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index dcf4de2..53dcf34 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -35,7 +35,7 @@ public class ExecutableFlow {
private boolean submitted = false;
public enum Status {
- FAILED, FAILED_FINISHING, SUCCEEDED, RUNNING, WAITING, KILLED, DISABLED, READY, PAUSED, UNKNOWN
+ FAILED, FAILED_FINISHING, SUCCEEDED, RUNNING, WAITING, KILLED, DISABLED, READY, UNKNOWN, PAUSED
}
public ExecutableFlow(String id, Flow flow) {
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index 09628b4..e27a65d 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -82,8 +82,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private void createLogger() {
// Create logger
- String loggerName = System.currentTimeMillis() + "."
- + flow.getExecutionId();
+ String loggerName = System.currentTimeMillis() + "." + flow.getExecutionId();
logger = Logger.getLogger(loggerName);
// Create file appender
diff --git a/src/java/azkaban/project/FileProjectManager.java b/src/java/azkaban/project/FileProjectManager.java
index 01ee65e..ee86bab 100644
--- a/src/java/azkaban/project/FileProjectManager.java
+++ b/src/java/azkaban/project/FileProjectManager.java
@@ -41,6 +41,7 @@ import azkaban.utils.Props;
*/
public class FileProjectManager implements ProjectManager {
public static final String DIRECTORY_PARAM = "file.project.loader.path";
+ private static final String DELETED_PROJECT_PREFIX = ".DELETED.";
private static final DateTimeFormatter FILE_DATE_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd-HH:mm.ss.SSS");
private static final String PROPERTIES_FILENAME = "project.json";
private static final String PROJECT_DIRECTORY = "src";
@@ -99,6 +100,9 @@ public class FileProjectManager implements ProjectManager {
if (!dir.isDirectory()) {
logger.error("ERROR loading project from " + dir.getPath() + ". Not a directory.");
}
+ else if (dir.getName().startsWith(DELETED_PROJECT_PREFIX)) {
+ continue;
+ }
else {
File propertiesFile = new File(dir, PROPERTIES_FILENAME);
if (!propertiesFile.exists()) {
@@ -435,10 +439,25 @@ public class FileProjectManager implements ProjectManager {
}
@Override
- public synchronized Project removeProject(String projectName) {
- return null;
+ public synchronized Project removeProject(String projectName) throws ProjectManagerException {
+ Project project = this.getProject(projectName);
+
+ if (project == null) {
+ throw new ProjectManagerException("Project " + projectName + " doesn't exist.");
+ }
+
+ File projectPath = new File(projectDirectory, projectName);
+ File deletedProjectPath = new File(projectDirectory, DELETED_PROJECT_PREFIX + System.currentTimeMillis() + "." + projectName);
+ if (projectPath.exists()) {
+ if (!projectPath.renameTo(deletedProjectPath)) {
+ throw new ProjectManagerException("Deleting of project failed.");
+ }
+ }
+
+ projects.remove(projectName);
+ return project;
}
-
+
private static class SuffixFilter implements FileFilter {
private String suffix;
src/java/azkaban/project/ProjectLogger.java 62(+62 -0)
diff --git a/src/java/azkaban/project/ProjectLogger.java b/src/java/azkaban/project/ProjectLogger.java
new file mode 100644
index 0000000..5d6760f
--- /dev/null
+++ b/src/java/azkaban/project/ProjectLogger.java
@@ -0,0 +1,62 @@
+package azkaban.project;
+
+import java.io.File;
+import java.util.logging.Level;
+
+import org.apache.log4j.Appender;
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Layout;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.Element;
+import net.sf.ehcache.config.CacheConfiguration;
+import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
+
+/*
+ *
+ */
+public class ProjectLogger {
+
+ private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
+ private static final int TIME_TO_IDLE_SECS = 30 * 60; // 30 mins
+ private static final String PROJECT_SUFFIX = ".PROJECT";
+ private CacheManager manager = CacheManager.create();
+ private Cache cache;
+ private String path;
+
+ public ProjectLogger(String projectPath) {
+ CacheConfiguration config = new CacheConfiguration();
+ config.setName("loggerCache");
+ config.setTimeToLiveSeconds(TIME_TO_IDLE_SECS);
+ config.eternal(false);
+ config.diskPersistent(false);
+ config.memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU);
+
+ cache = new Cache(config);
+ manager.addCache(cache);
+ }
+
+ public void log(Level level, String project, String message) {
+
+ }
+
+ private Logger getLogger(String projectId) {
+ Element element = cache.get(projectId);
+ Logger logger = Logger.getLogger(projectId + PROJECT_SUFFIX);
+
+// if (element == null) {
+// File file = new File(path);
+// new File(file, );
+// Appender appender = new FileAppender(DEFAULT_LAYOUT, , false);
+// logger.addAppender(jobAppender);
+// }
+// else {
+// Object obj = element.getObjectValue();
+// }
+
+ return null;
+ }
+}
src/java/azkaban/scheduler/LocalFileScheduleLoader.java 456(+230 -226)
diff --git a/src/java/azkaban/scheduler/LocalFileScheduleLoader.java b/src/java/azkaban/scheduler/LocalFileScheduleLoader.java
index 64d7654..733ee77 100644
--- a/src/java/azkaban/scheduler/LocalFileScheduleLoader.java
+++ b/src/java/azkaban/scheduler/LocalFileScheduleLoader.java
@@ -25,7 +25,6 @@ import org.joda.time.Seconds;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
-import azkaban.user.User;
import azkaban.utils.Props;
import azkaban.utils.JSONUtils;
@@ -51,10 +50,12 @@ import azkaban.utils.JSONUtils;
*/
public class LocalFileScheduleLoader implements ScheduleLoader {
private static final String SCHEDULEID = "scheduleId";
+ private static final String PROJECTID = "projectId";
+ private static final String FLOWID = "flowId";
private static final String USER = "user";
private static final String USERSUBMIT = "userSubmit";
- private static final String SUBMITTIME = "submitTime";
- private static final String FIRSTSCHEDTIME = "firstSchedTime";
+ private static final String SUBMITTIME = "submitTime";
+ private static final String FIRSTSCHEDTIME = "firstSchedTime";
private static final String SCHEDULE = "schedule";
private static final String NEXTEXECTIME = "nextExecTime";
@@ -111,79 +112,77 @@ public class LocalFileScheduleLoader implements ScheduleLoader {
@Override
public List<ScheduledFlow> loadSchedule() {
- if (scheduleFile != null && backupScheduleFile != null) {
- if (scheduleFile.exists()) {
- if(scheduleFile.length() == 0)
- return new ArrayList<ScheduledFlow>();
+ if (scheduleFile != null && backupScheduleFile != null) {
+ if (scheduleFile.exists()) {
+ if (scheduleFile.length() == 0)
+ return new ArrayList<ScheduledFlow>();
return loadFromFile(scheduleFile);
- }
- else if (backupScheduleFile.exists()) {
- backupScheduleFile.renameTo(scheduleFile);
+ } else if (backupScheduleFile.exists()) {
+ backupScheduleFile.renameTo(scheduleFile);
return loadFromFile(scheduleFile);
- }
- else {
- logger.warn("No schedule files found looking for " + scheduleFile.getAbsolutePath());
- }
- }
-
- return new ArrayList<ScheduledFlow>();
+ } else {
+ logger.warn("No schedule files found looking for "
+ + scheduleFile.getAbsolutePath());
+ }
+ }
+
+ return new ArrayList<ScheduledFlow>();
}
@Override
public void saveSchedule(List<ScheduledFlow> schedule) {
- if (scheduleFile != null && backupScheduleFile != null) {
- // Delete the backup if it exists and a current file exists.
- if (backupScheduleFile.exists() && scheduleFile.exists()) {
- backupScheduleFile.delete();
- }
-
- // Rename the schedule if it exists.
- if (scheduleFile.exists()) {
- scheduleFile.renameTo(backupScheduleFile);
- }
-
- HashMap<String,Object> obj = new HashMap<String,Object>();
- ArrayList<Object> schedules = new ArrayList<Object>();
- obj.put(SCHEDULE, schedules);
- //Write out schedule.
-
- for (ScheduledFlow schedFlow : schedule) {
- schedules.add(createJSONObject(schedFlow));
- }
-
- try {
- FileWriter writer = new FileWriter(scheduleFile);
- writer.write(JSONUtils.toJSON(obj, true));
- writer.flush();
- } catch (Exception e) {
- throw new RuntimeException("Error saving flow file", e);
- }
- logger.info("schedule saved");
- }
+ if (scheduleFile != null && backupScheduleFile != null) {
+ // Delete the backup if it exists and a current file exists.
+ if (backupScheduleFile.exists() && scheduleFile.exists()) {
+ backupScheduleFile.delete();
+ }
+
+ // Rename the schedule if it exists.
+ if (scheduleFile.exists()) {
+ scheduleFile.renameTo(backupScheduleFile);
+ }
+
+ HashMap<String, Object> obj = new HashMap<String, Object>();
+ ArrayList<Object> schedules = new ArrayList<Object>();
+ obj.put(SCHEDULE, schedules);
+ // Write out schedule.
+
+ for (ScheduledFlow schedFlow : schedule) {
+ schedules.add(createJSONObject(schedFlow));
+ }
+
+ try {
+ FileWriter writer = new FileWriter(scheduleFile);
+ writer.write(JSONUtils.toJSON(obj, true));
+ writer.flush();
+ } catch (Exception e) {
+ throw new RuntimeException("Error saving flow file", e);
+ }
+ logger.info("schedule saved");
+ }
}
@SuppressWarnings("unchecked")
- private List<ScheduledFlow> loadFromFile(File schedulefile)
- {
- BufferedReader reader = null;
+ private List<ScheduledFlow> loadFromFile(File schedulefile) {
+ BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(schedulefile));
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
logger.error("Error loading schedule file ", e);
}
- List<ScheduledFlow> scheduleList = new ArrayList<ScheduledFlow>();
-
+ List<ScheduledFlow> scheduleList = new ArrayList<ScheduledFlow>();
+
HashMap<String, Object> schedule;
try {
- //TODO handle first time empty schedule file
- schedule = (HashMap<String,Object>)JSONUtils.parseJSONFromReader(reader);
+ // TODO handle first time empty schedule file
+ schedule = (HashMap<String, Object>) JSONUtils
+ .parseJSONFromReader(reader);
} catch (Exception e) {
- //schedule = loadLegacyFile(schedulefile);
+ // schedule = loadLegacyFile(schedulefile);
logger.error("Error parsing the schedule file", e);
throw new RuntimeException("Error parsing the schedule file", e);
- }
- finally {
+ } finally {
try {
reader.close();
} catch (IOException e) {
@@ -200,178 +199,183 @@ public class LocalFileScheduleLoader implements ScheduleLoader {
}
return scheduleList;
- }
-
- private ScheduledFlow createScheduledFlow(HashMap<String, Object> obj) {
- String scheduleId = (String)obj.get(SCHEDULEID);
- String user = (String)obj.get(USER);
- String userSubmit = (String)obj.get(USERSUBMIT);
- String submitTimeRaw = (String)obj.get(SUBMITTIME);
- String firstSchedTimeRaw = (String)obj.get(FIRSTSCHEDTIME);
- String nextExecTimeRaw = (String)obj.get(NEXTEXECTIME);
- String timezone = (String)obj.get(TIMEZONE);
- String recurrence = (String)obj.get(RECURRENCE);
-// String scheduleStatus = (String)obj.get(SCHEDULESTATUS);
-
- DateTime nextExecTime = FILE_DATEFORMAT.parseDateTime(nextExecTimeRaw);
- DateTime submitTime = FILE_DATEFORMAT.parseDateTime(submitTimeRaw);
- DateTime firstSchedTime = FILE_DATEFORMAT.parseDateTime(firstSchedTimeRaw);
-
- if (nextExecTime == null) {
- logger.error("No next execution time has been set");
- return null;
- }
-
- if (submitTime == null) {
- logger.error("No submitTime has been set");
- }
-
- if(firstSchedTime == null){
- logger.error("No first scheduled time has been set");
- }
-
- if (timezone != null) {
- nextExecTime = nextExecTime.withZoneRetainFields(DateTimeZone.forID(timezone));
- }
-
- ReadablePeriod period = null;
- if (recurrence != null) {
- period = parsePeriodString(scheduleId, recurrence);
- }
-
- ScheduledFlow scheduledFlow = new ScheduledFlow(scheduleId, user, userSubmit, submitTime, firstSchedTime, nextExecTime, period);
- if (scheduledFlow.updateTime()) {
- return scheduledFlow;
- }
-
- logger.info("Removed " + scheduleId + " off out of scheduled. It is not recurring.");
- return null;
- }
-
- private HashMap<String,Object> createJSONObject(ScheduledFlow flow) {
- HashMap<String,Object> object = new HashMap<String,Object>();
- object.put(SCHEDULEID, flow.getScheduleId());
- object.put(USER, flow.getUser());
- object.put(USERSUBMIT, flow.getUserSubmit());
-
- object.put(SUBMITTIME, FILE_DATEFORMAT.print(flow.getSubmitTime()));
- object.put(FIRSTSCHEDTIME, FILE_DATEFORMAT.print(flow.getFirstSchedTime()));
-
- object.put(NEXTEXECTIME, FILE_DATEFORMAT.print(flow.getNextExecTime()));
- object.put(TIMEZONE, flow.getNextExecTime().getZone().getID());
- object.put(RECURRENCE, createPeriodString(flow.getPeriod()));
-// object.put(SCHEDULESTATUS, flow.getSchedStatus());
-
- return object;
- }
-
- private ReadablePeriod parsePeriodString(String scheduleId, String periodStr)
- {
- ReadablePeriod period;
- char periodUnit = periodStr.charAt(periodStr.length() - 1);
- if (periodUnit == 'n') {
- return null;
- }
-
- int periodInt = Integer.parseInt(periodStr.substring(0, periodStr.length() - 1));
- switch (periodUnit) {
- case 'M':
- period = Months.months(periodInt);
- break;
- case 'w':
- period = Weeks.weeks(periodInt);
- break;
- case 'd':
- period = Days.days(periodInt);
- break;
- case 'h':
- period = Hours.hours(periodInt);
- break;
- case 'm':
- period = Minutes.minutes(periodInt);
- break;
- case 's':
- period = Seconds.seconds(periodInt);
- break;
- default:
- throw new IllegalArgumentException("Invalid schedule period unit '" + periodUnit + "' for flow " + scheduleId);
- }
-
- return period;
- }
-
- private String createPeriodString(ReadablePeriod period)
- {
- String periodStr = "n";
-
- if (period == null) {
- return "n";
- }
-
- if (period.get(DurationFieldType.months()) > 0) {
- int months = period.get(DurationFieldType.months());
- periodStr = months + "M";
- }
- else if (period.get(DurationFieldType.weeks()) > 0) {
- int weeks = period.get(DurationFieldType.weeks());
- periodStr = weeks + "w";
- }
- else if (period.get(DurationFieldType.days()) > 0) {
- int days = period.get(DurationFieldType.days());
- periodStr = days + "d";
- }
- else if (period.get(DurationFieldType.hours()) > 0) {
- int hours = period.get(DurationFieldType.hours());
- periodStr = hours + "h";
- }
- else if (period.get(DurationFieldType.minutes()) > 0) {
- int minutes = period.get(DurationFieldType.minutes());
- periodStr = minutes + "m";
- }
- else if (period.get(DurationFieldType.seconds()) > 0) {
- int seconds = period.get(DurationFieldType.seconds());
- periodStr = seconds + "s";
- }
-
- return periodStr;
- }
-
-// private HashMap<String,Object> loadLegacyFile(File schedulefile) {
-// Props schedule = null;
-// try {
-// schedule = new Props(null, schedulefile.getAbsolutePath());
-// } catch(Exception e) {
-// throw new RuntimeException("Error loading schedule from " + schedulefile);
-// }
+ }
+
+ private ScheduledFlow createScheduledFlow(HashMap<String, Object> obj) {
+ String scheduleId = (String) obj.get(SCHEDULEID);
+ String projectId = (String) obj.get(PROJECTID);
+ String flowId = (String) obj.get(FLOWID);
+ String user = (String) obj.get(USER);
+ String userSubmit = (String) obj.get(USERSUBMIT);
+ String submitTimeRaw = (String) obj.get(SUBMITTIME);
+ String firstSchedTimeRaw = (String) obj.get(FIRSTSCHEDTIME);
+ String nextExecTimeRaw = (String) obj.get(NEXTEXECTIME);
+ String timezone = (String) obj.get(TIMEZONE);
+ String recurrence = (String) obj.get(RECURRENCE);
+ // String scheduleStatus = (String)obj.get(SCHEDULESTATUS);
+
+ DateTime nextExecTime = FILE_DATEFORMAT.parseDateTime(nextExecTimeRaw);
+ DateTime submitTime = FILE_DATEFORMAT.parseDateTime(submitTimeRaw);
+ DateTime firstSchedTime = FILE_DATEFORMAT
+ .parseDateTime(firstSchedTimeRaw);
+
+ if (nextExecTime == null) {
+ logger.error("No next execution time has been set");
+ return null;
+ }
+
+ if (submitTime == null) {
+ logger.error("No submitTime has been set");
+ }
+
+ if (firstSchedTime == null) {
+ logger.error("No first scheduled time has been set");
+ }
+
+ if (timezone != null) {
+ nextExecTime = nextExecTime.withZoneRetainFields(DateTimeZone
+ .forID(timezone));
+ }
+
+ ReadablePeriod period = null;
+ if (recurrence != null) {
+ period = parsePeriodString(scheduleId, recurrence);
+ }
+
+ ScheduledFlow scheduledFlow = new ScheduledFlow(scheduleId, projectId, flowId, user, userSubmit, submitTime, firstSchedTime, nextExecTime, period);
+ if (scheduledFlow.updateTime()) {
+ return scheduledFlow;
+ }
+
+ logger.info("Removed " + scheduleId
+ + " off out of scheduled. It is not recurring.");
+ return null;
+ }
+
+ private HashMap<String, Object> createJSONObject(ScheduledFlow flow) {
+ HashMap<String, Object> object = new HashMap<String, Object>();
+ object.put(SCHEDULEID, flow.getScheduleId());
+ object.put(PROJECTID, flow.getProjectId());
+ object.put(FLOWID, flow.getFlowId());
+ object.put(USER, flow.getUser());
+ object.put(USERSUBMIT, flow.getUserSubmit());
+
+ object.put(SUBMITTIME, FILE_DATEFORMAT.print(flow.getSubmitTime()));
+ object.put(FIRSTSCHEDTIME,
+ FILE_DATEFORMAT.print(flow.getFirstSchedTime()));
+
+ object.put(NEXTEXECTIME, FILE_DATEFORMAT.print(flow.getNextExecTime()));
+ object.put(TIMEZONE, flow.getNextExecTime().getZone().getID());
+ object.put(RECURRENCE, createPeriodString(flow.getPeriod()));
+ // object.put(SCHEDULESTATUS, flow.getSchedStatus());
+
+ return object;
+ }
+
+ private ReadablePeriod parsePeriodString(String scheduleId, String periodStr) {
+ ReadablePeriod period;
+ char periodUnit = periodStr.charAt(periodStr.length() - 1);
+ if (periodUnit == 'n') {
+ return null;
+ }
+
+ int periodInt = Integer.parseInt(periodStr.substring(0,
+ periodStr.length() - 1));
+ switch (periodUnit) {
+ case 'M':
+ period = Months.months(periodInt);
+ break;
+ case 'w':
+ period = Weeks.weeks(periodInt);
+ break;
+ case 'd':
+ period = Days.days(periodInt);
+ break;
+ case 'h':
+ period = Hours.hours(periodInt);
+ break;
+ case 'm':
+ period = Minutes.minutes(periodInt);
+ break;
+ case 's':
+ period = Seconds.seconds(periodInt);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid schedule period unit '"
+ + periodUnit + "' for flow " + scheduleId);
+ }
+
+ return period;
+ }
+
+ private String createPeriodString(ReadablePeriod period) {
+ String periodStr = "n";
+
+ if (period == null) {
+ return "n";
+ }
+
+ if (period.get(DurationFieldType.months()) > 0) {
+ int months = period.get(DurationFieldType.months());
+ periodStr = months + "M";
+ } else if (period.get(DurationFieldType.weeks()) > 0) {
+ int weeks = period.get(DurationFieldType.weeks());
+ periodStr = weeks + "w";
+ } else if (period.get(DurationFieldType.days()) > 0) {
+ int days = period.get(DurationFieldType.days());
+ periodStr = days + "d";
+ } else if (period.get(DurationFieldType.hours()) > 0) {
+ int hours = period.get(DurationFieldType.hours());
+ periodStr = hours + "h";
+ } else if (period.get(DurationFieldType.minutes()) > 0) {
+ int minutes = period.get(DurationFieldType.minutes());
+ periodStr = minutes + "m";
+ } else if (period.get(DurationFieldType.seconds()) > 0) {
+ int seconds = period.get(DurationFieldType.seconds());
+ periodStr = seconds + "s";
+ }
+
+ return periodStr;
+ }
+
+// private HashMap<String, Object> loadLegacyFile(File schedulefile) {
+// Props schedule = null;
+// try {
+// schedule = new Props(null, schedulefile.getAbsolutePath());
+// } catch (Exception e) {
+// throw new RuntimeException("Error loading schedule from "
+// + schedulefile);
+// }
//
-// ArrayList<Object> flowScheduleList = new ArrayList<Object>();
-// for(String key: schedule.getKeySet()) {
-// HashMap<String,Object> scheduledMap = parseScheduledFlow(key, schedule.get(key));
-// if (scheduledMap == null) {
-// flowScheduleList.add(scheduledMap);
-// }
-// }
+// ArrayList<Object> flowScheduleList = new ArrayList<Object>();
+// for (String key : schedule.getKeySet()) {
+// HashMap<String, Object> scheduledMap = parseScheduledFlow(key,
+// schedule.get(key));
+// if (scheduledMap == null) {
+// flowScheduleList.add(scheduledMap);
+// }
+// }
+//
+// HashMap<String, Object> scheduleMap = new HashMap<String, Object>();
+// scheduleMap.put(SCHEDULE, flowScheduleList);
+//
+// return scheduleMap;
+// }
+
+// private HashMap<String, Object> parseScheduledFlow(String name, String flow) {
+// String[] pieces = flow.split("\\s+");
//
-// HashMap<String,Object> scheduleMap = new HashMap<String,Object>();
-// scheduleMap.put(SCHEDULE, flowScheduleList );
-//
-// return scheduleMap;
-// }
-
-// private HashMap<String,Object> parseScheduledFlow(String name, String flow) {
-// String[] pieces = flow.split("\\s+");
+// if (pieces.length != 3) {
+// logger.warn("Error loading schedule from file " + name);
+// return null;
+// }
//
-// if(pieces.length != 3) {
-// logger.warn("Error loading schedule from file " + name);
-// return null;
-// }
+// HashMap<String, Object> scheduledFlow = new HashMap<String, Object>();
+// scheduledFlow.put(PROJECTID, name);
+// scheduledFlow.put(TIME, pieces[0]);
+// scheduledFlow.put(RECURRENCE, pieces[1]);
+// Boolean dependency = Boolean.parseBoolean(pieces[2]);
//
-// HashMap<String,Object> scheduledFlow = new HashMap<String,Object>();
-// scheduledFlow.put(PROJECTID, name);
-// scheduledFlow.put(TIME, pieces[0]);
-// scheduledFlow.put(RECURRENCE, pieces[1]);
-// Boolean dependency = Boolean.parseBoolean(pieces[2]);
-//
-// return scheduledFlow;
-// }
+// return scheduledFlow;
+// }
}
\ No newline at end of file
src/java/azkaban/scheduler/ScheduledFlow.java 397(+201 -196)
diff --git a/src/java/azkaban/scheduler/ScheduledFlow.java b/src/java/azkaban/scheduler/ScheduledFlow.java
index 0b1f486..1e92af2 100644
--- a/src/java/azkaban/scheduler/ScheduledFlow.java
+++ b/src/java/azkaban/scheduler/ScheduledFlow.java
@@ -19,10 +19,8 @@ package azkaban.scheduler;
import org.joda.time.DateTime;
import org.joda.time.ReadablePeriod;
-import azkaban.user.User;
import azkaban.utils.Utils;
-
/**
* Schedule for a job instance. This is decoupled from the execution.
*
@@ -31,196 +29,207 @@ import azkaban.utils.Utils;
*/
public class ScheduledFlow {
- //use projectId.flowId to form a unique scheduleId
- private final String scheduleId;
-
- private final ReadablePeriod period;
- private DateTime nextExecTime;
- private final String user;
- private final String userSubmit;
- private final DateTime submitTime;
- private final DateTime firstSchedTime;
-// private SchedStatus schedStatus;
-
- public enum SchedStatus {
- LASTSUCCESS ("lastsuccess"),
- LASTFAILED ("lastfailed"),
- LASTPAUSED ("lastpaused");
-
- private final String status;
- SchedStatus(String status){
- this.status = status;
- }
- private String status(){
- return this.status;
- }
- }
-
- /**
- * Constructor
- *
- * @param jobName Unique job name
- * @param nextExecution The next execution time
- * @param ignoreDependency
- */
- public ScheduledFlow(String scheduleId,
- String user,
- String userSubmit,
- DateTime submitTime,
- DateTime firstSchedTime,
- DateTime nextExecution) {
- this(scheduleId, user, userSubmit, submitTime, firstSchedTime, nextExecution, null);
- }
-
- /**
- * Constructor
- *
- * @param jobId
- * @param nextExecution
- * @param period
- * @param ignoreDependency
- */
- public ScheduledFlow(String scheduleId,
- String user,
- String userSubmit,
- DateTime submitTime,
- DateTime firstSchedTime,
- DateTime nextExecution,
- ReadablePeriod period) {
- super();
- this.scheduleId = Utils.nonNull(scheduleId);
- this.user = user;
- this.userSubmit = userSubmit;
- this.submitTime = submitTime;
- this.firstSchedTime = firstSchedTime;
- this.period = period;
- this.nextExecTime = Utils.nonNull(nextExecution);
-// this.schedStatus = SchedStatus.LASTSUCCESS;
- }
-
- public ScheduledFlow(String scheduleId,
- String user,
- String userSubmit,
+ // use projectId.flowId to form a unique scheduleId
+ private final String scheduleId;
+ private final String flowId;
+ private final String projectId;
+
+ private final ReadablePeriod period;
+ private DateTime nextExecTime;
+ private final String user;
+ private final String userSubmit;
+ private final DateTime submitTime;
+ private final DateTime firstSchedTime;
+
+ // private SchedStatus schedStatus;
+
+ public enum SchedStatus {
+ LASTSUCCESS("lastsuccess"), LASTFAILED("lastfailed"), LASTPAUSED(
+ "lastpaused");
+
+ private final String status;
+
+ SchedStatus(String status) {
+ this.status = status;
+ }
+
+ private String status() {
+ return this.status;
+ }
+ }
+
+ /**
+ * Constructor
+ *
+ * @param jobId
+ * @param nextExecution
+ * @param period
+ * @param ignoreDependency
+ */
+ public ScheduledFlow(
+ String scheduleId,
+ String projectId,
+ String flowId,
+ String user,
+ String userSubmit,
DateTime submitTime,
DateTime firstSchedTime,
- ReadablePeriod period) {
- super();
- this.scheduleId = Utils.nonNull(scheduleId);
- this.user = user;
- this.userSubmit = userSubmit;
- this.submitTime = submitTime;
- this.firstSchedTime = firstSchedTime;
- this.period = period;
- this.nextExecTime = new DateTime(firstSchedTime);
-// this.schedStatus = SchedStatus.LASTSUCCESS;
+ DateTime nextExecution,
+ ReadablePeriod period)
+ {
+ super();
+ this.scheduleId = Utils.nonNull(scheduleId);
+ this.projectId = Utils.nonNull(projectId);
+ this.flowId = Utils.nonNull(flowId);
+ this.user = user;
+ this.userSubmit = userSubmit;
+ this.submitTime = submitTime;
+ this.firstSchedTime = firstSchedTime;
+ this.period = period;
+ this.nextExecTime = Utils.nonNull(nextExecution);
+ // this.schedStatus = SchedStatus.LASTSUCCESS;
}
- public ScheduledFlow(String scheduleId,
- String user,
- String userSubmit,
+ public ScheduledFlow(
+ String scheduleId,
+ String projectId,
+ String flowId,
+ String user,
+ String userSubmit,
+ DateTime submitTime,
+ DateTime firstSchedTime,
+ ReadablePeriod period)
+ {
+ this(scheduleId, projectId, flowId, user, userSubmit, submitTime, firstSchedTime, new DateTime(), period);
+ }
+
+ public ScheduledFlow(
+ String scheduleId,
+ String projectId,
+ String flowId,
+ String user,
+ String userSubmit,
DateTime submitTime,
DateTime firstSchedTime) {
- super();
- this.scheduleId = Utils.nonNull(scheduleId);
- this.user = user;
- this.userSubmit = userSubmit;
- this.submitTime = submitTime;
- this.firstSchedTime = firstSchedTime;
- this.period = null;
- this.nextExecTime = new DateTime();
-// this.schedStatus = SchedStatus.LASTSUCCESS;
+ this(scheduleId, projectId, flowId, user, userSubmit, submitTime, firstSchedTime, new DateTime(), null);
}
-// public SchedStatus getSchedStatus() {
-// return this.schedStatus;
-// }
-//
-// public void setSchedStatus(SchedStatus schedStatus) {
-// this.schedStatus = schedStatus;
-// }
+ /**
+ * Constructor
+ *
+ * @param jobName
+ * Unique job name
+ * @param nextExecution
+ * The next execution time
+ * @param ignoreDependency
+ */
+ public ScheduledFlow(String scheduleId,
+ String projectId,
+ String flowId,
+ String user,
+ String userSubmit,
+ DateTime submitTime,
+ DateTime firstSchedTime,
+ DateTime nextExecution)
+ {
+ this(scheduleId, projectId, flowId, user, userSubmit, submitTime, firstSchedTime, nextExecution, null);
+ }
+
+ // public SchedStatus getSchedStatus() {
+ // return this.schedStatus;
+ // }
+ //
+ // public void setSchedStatus(SchedStatus schedStatus) {
+ // this.schedStatus = schedStatus;
+ // }
/**
- * Updates the time to a future time after 'now' that matches the period description.
- *
- * @return
- */
- public boolean updateTime() {
- if (nextExecTime.isAfterNow()) {
- return true;
- }
-
- if (period != null) {
- DateTime other = getNextRuntime(nextExecTime, period);
-
- this.nextExecTime = other;
- return true;
- }
-
- return false;
- }
-
- /**
- * Calculates the next runtime by adding the period.
- *
- * @param scheduledDate
- * @param period
- * @return
- */
- private DateTime getNextRuntime(DateTime scheduledDate, ReadablePeriod period)
- {
- DateTime now = new DateTime();
- DateTime date = new DateTime(scheduledDate);
- int count = 0;
- while (!now.isBefore(date)) {
- if (count > 100000) {
- throw new IllegalStateException("100000 increments of period did not get to present time.");
- }
-
- if (period == null) {
- break;
- }
- else {
- date = date.plus(period);
- }
-
- count += 1;
- }
-
- return date;
- }
-
- /**
- * Returns the unique id of the job to be run.
- * @return
- */
-
- /**
- * Returns true if the job recurrs in the future
- * @return
- */
- public boolean isRecurring() {
- return this.period != null;
- }
-
- /**
- * Returns the recurrance period. Or null if not applicable
- * @return
- */
- public ReadablePeriod getPeriod() {
- return period;
- }
+ * Updates the time to a future time after 'now' that matches the period
+ * description.
+ *
+ * @return
+ */
+ public boolean updateTime() {
+ if (nextExecTime.isAfterNow()) {
+ return true;
+ }
+
+ if (period != null) {
+ DateTime other = getNextRuntime(nextExecTime, period);
+
+ this.nextExecTime = other;
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Calculates the next runtime by adding the period.
+ *
+ * @param scheduledDate
+ * @param period
+ * @return
+ */
+ private DateTime getNextRuntime(DateTime scheduledDate,
+ ReadablePeriod period) {
+ DateTime now = new DateTime();
+ DateTime date = new DateTime(scheduledDate);
+ int count = 0;
+ while (!now.isBefore(date)) {
+ if (count > 100000) {
+ throw new IllegalStateException(
+ "100000 increments of period did not get to present time.");
+ }
+
+ if (period == null) {
+ break;
+ } else {
+ date = date.plus(period);
+ }
+
+ count += 1;
+ }
+
+ return date;
+ }
+
+ /**
+ * Returns the unique id of the job to be run.
+ *
+ * @return
+ */
+
+ /**
+ * Returns true if the job recurrs in the future
+ *
+ * @return
+ */
+ public boolean isRecurring() {
+ return this.period != null;
+ }
+
+ /**
+ * Returns the recurrance period. Or null if not applicable
+ *
+ * @return
+ */
+ public ReadablePeriod getPeriod() {
+ return period;
+ }
public DateTime getFirstSchedTime() {
return firstSchedTime;
}
/**
- * Returns the next scheduled execution
- * @return
- */
- public DateTime getNextExecTime() {
- return nextExecTime;
- }
+ * Returns the next scheduled execution
+ *
+ * @return
+ */
+ public DateTime getNextExecTime() {
+ return nextExecTime;
+ }
public String getUserSubmit() {
return userSubmit;
@@ -231,33 +240,29 @@ public class ScheduledFlow {
}
@Override
- public String toString()
- {
- return "ScheduledFlow{" +
-// "scheduleStatus=" + schedStatus +
- "nextExecTime=" + nextExecTime +
- ", period=" + period.toString() +
- ", firstSchedTime=" + firstSchedTime +
- ", submitTime=" + submitTime +
- ", userSubmit=" + userSubmit +
- ", user=" + user +
- ", scheduleId='" + scheduleId + '\'' +
- '}';
- }
+ public String toString() {
+ return "ScheduledFlow{"
+ +
+ // "scheduleStatus=" + schedStatus +
+ "nextExecTime=" + nextExecTime + ", period=" + period
+ + ", firstSchedTime=" + firstSchedTime + ", submitTime="
+ + submitTime + ", userSubmit=" + userSubmit + ", user=" + user
+ + ", scheduleId='" + scheduleId + '\'' + '}';
+ }
public String getUser() {
return user;
}
-
+
public String getScheduleId() {
return scheduleId;
}
-
- public String getFlowId(){
- return this.scheduleId.split("\\.")[1];
+
+ public String getFlowId() {
+ return flowId;
}
-
- public String getProjectId(){
- return this.scheduleId.split("\\.")[0];
+
+ public String getProjectId() {
+ return projectId;
}
}
src/java/azkaban/scheduler/ScheduleManager.java 791(+423 -368)
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 12c9184..06d8ad4 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -31,380 +31,435 @@ import azkaban.user.Permission.Type;
import azkaban.user.User;
import azkaban.utils.Props;
-
-
/**
- * The ScheduleManager stores and executes the schedule. It uses a single thread instead
- * and waits until correct loading time for the flow. It will not remove the flow from the
- * schedule when it is run, which can potentially allow the flow to and overlap each other.
- *
- * @author Richard
+ * The ScheduleManager stores and executes the schedule. It uses a single thread
+ * instead and waits until correct loading time for the flow. It will not remove
+ * the flow from the schedule when it is run, which can potentially allow the
+ * flow to and overlap each other.
*/
public class ScheduleManager {
private static Logger logger = Logger.getLogger(ScheduleManager.class);
- private final DateTimeFormatter _dateFormat = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss:SSS");
+ private final DateTimeFormatter _dateFormat = DateTimeFormat
+ .forPattern("MM-dd-yyyy HH:mm:ss:SSS");
private ScheduleLoader loader;
- private Map<String, ScheduledFlow> scheduleIDMap = new LinkedHashMap<String, ScheduledFlow>();
- private final ScheduleRunner runner;
- private final ExecutorManager executorManager;
- private final ProjectManager projectManager;
-
- /**
- * Give the schedule manager a loader class that will properly load the schedule.
- *
- * @param loader
- */
- public ScheduleManager(
- ExecutorManager executorManager,
- ProjectManager projectManager,
- ScheduleLoader loader)
- {
- this.executorManager = executorManager;
- this.projectManager = projectManager;
- this.loader = loader;
- this.runner = new ScheduleRunner();
-
- List<ScheduledFlow> scheduleList = loader.loadSchedule();
- for (ScheduledFlow flow: scheduleList) {
- internalSchedule(flow);
- }
-
- this.runner.start();
- }
-
- /**
- * Shutdowns the scheduler thread. After shutdown, it may not be safe to use it again.
- */
- public void shutdown() {
- this.runner.shutdown();
- }
-
- /**
- * Retrieves a copy of the list of schedules.
- *
- * @return
- */
- public synchronized List<ScheduledFlow> getSchedule() {
- return runner.getSchedule();
- }
-
- /**
- * Returns the scheduled flow for the flow name
- *
- * @param id
- * @return
- */
- public ScheduledFlow getSchedule(String scheduleId) {
- return scheduleIDMap.get(scheduleId);
- }
-
- /**
- * Removes the flow from the schedule if it exists.
- *
- * @param id
- */
- public synchronized void removeScheduledFlow(String scheduleId) {
- ScheduledFlow flow = scheduleIDMap.get(scheduleId);
- scheduleIDMap.remove(scheduleId);
- runner.removeScheduledFlow(flow);
-
- loader.saveSchedule(getSchedule());
- }
-
-// public synchronized void pauseScheduledFlow(String scheduleId){
-// try{
-// ScheduledFlow flow = scheduleIDMap.get(scheduleId);
-// flow.setSchedStatus(SchedStatus.LASTPAUSED);
-// loader.saveSchedule(getSchedule());
-// }
-// catch (Exception e) {
-// throw new RuntimeException("Error pausing a schedule " + scheduleId);
-// }
-// }
-//
-// public synchronized void resumeScheduledFlow(String scheduleId){
-// try {
-// ScheduledFlow flow = scheduleIDMap.get(scheduleId);
-// flow.setSchedStatus(SchedStatus.LASTSUCCESS);
-// loader.saveSchedule(getSchedule());
-// }
-// catch (Exception e) {
-// throw new RuntimeException("Error resuming a schedule " + scheduleId);
-// }
-// }
-
- public void schedule(final String scheduleId,
- final String user,
- final String userSubmit,
- final DateTime submitTime,
- final DateTime firstSchedTime,
- final ReadablePeriod period
- ) {
- //TODO: should validate projectId and flowId?
- logger.info("Scheduling flow '" + scheduleId + "' for " + _dateFormat.print(firstSchedTime)
- + " with a period of " + PeriodFormat.getDefault().print(period));
- schedule(new ScheduledFlow(scheduleId, user, userSubmit, submitTime, firstSchedTime, period));
+ private Map<String, ScheduledFlow> scheduleIDMap = new LinkedHashMap<String, ScheduledFlow>();
+ private final ScheduleRunner runner;
+ private final ExecutorManager executorManager;
+ private final ProjectManager projectManager;
+
+ /**
+ * Give the schedule manager a loader class that will properly load the
+ * schedule.
+ *
+ * @param loader
+ */
+ public ScheduleManager(ExecutorManager executorManager,
+ ProjectManager projectManager,
+ ScheduleLoader loader)
+ {
+ this.executorManager = executorManager;
+ this.projectManager = projectManager;
+ this.loader = loader;
+ this.runner = new ScheduleRunner();
+
+ List<ScheduledFlow> scheduleList = loader.loadSchedule();
+ for (ScheduledFlow flow : scheduleList) {
+ internalSchedule(flow);
+ }
+
+ this.runner.start();
+ }
+
+ /**
+ * Shutdowns the scheduler thread. After shutdown, it may not be safe to use
+ * it again.
+ */
+ public void shutdown() {
+ this.runner.shutdown();
}
-
- /**
- * Schedule the flow
- * @param flowId
- * @param date
- * @param ignoreDep
- */
- public void schedule(String scheduleId, String user, String userSubmit, DateTime submitTime, DateTime firstSchedTime) {
- logger.info("Scheduling flow '" + scheduleId + "' for " + _dateFormat.print(firstSchedTime));
- schedule(new ScheduledFlow(scheduleId, user, userSubmit, submitTime, firstSchedTime));
- }
-
- /**
- * Schedules the flow, but doesn't save the schedule afterwards.
- * @param flow
- */
- private synchronized void internalSchedule(ScheduledFlow flow) {
- ScheduledFlow existing = scheduleIDMap.get(flow.getScheduleId());
- flow.updateTime();
- if (existing != null) {
- this.runner.removeScheduledFlow(existing);
- }
-
+
+ /**
+ * Retrieves a copy of the list of schedules.
+ *
+ * @return
+ */
+ public synchronized List<ScheduledFlow> getSchedule() {
+ return runner.getSchedule();
+ }
+
+ /**
+ * Returns the scheduled flow for the flow name
+ *
+ * @param id
+ * @return
+ */
+ public ScheduledFlow getSchedule(String scheduleId) {
+ return scheduleIDMap.get(scheduleId);
+ }
+
+ /**
+ * Removes the flow from the schedule if it exists.
+ *
+ * @param id
+ */
+ public synchronized void removeScheduledFlow(String scheduleId) {
+ ScheduledFlow flow = scheduleIDMap.get(scheduleId);
+ scheduleIDMap.remove(scheduleId);
+ runner.removeScheduledFlow(flow);
+
+ loader.saveSchedule(getSchedule());
+ }
+
+ // public synchronized void pauseScheduledFlow(String scheduleId){
+ // try{
+ // ScheduledFlow flow = scheduleIDMap.get(scheduleId);
+ // flow.setSchedStatus(SchedStatus.LASTPAUSED);
+ // loader.saveSchedule(getSchedule());
+ // }
+ // catch (Exception e) {
+ // throw new RuntimeException("Error pausing a schedule " + scheduleId);
+ // }
+ // }
+ //
+ // public synchronized void resumeScheduledFlow(String scheduleId){
+ // try {
+ // ScheduledFlow flow = scheduleIDMap.get(scheduleId);
+ // flow.setSchedStatus(SchedStatus.LASTSUCCESS);
+ // loader.saveSchedule(getSchedule());
+ // }
+ // catch (Exception e) {
+ // throw new RuntimeException("Error resuming a schedule " + scheduleId);
+ // }
+ // }
+
+ public void schedule(
+ final String scheduleId,
+ final String projectId,
+ final String flowId,
+ final String user,
+ final String userSubmit,
+ final DateTime submitTime,
+ final DateTime firstSchedTime,
+ final ReadablePeriod period) {
+ logger.info("Scheduling flow '" + scheduleId + "' for "
+ + _dateFormat.print(firstSchedTime) + " with a period of "
+ + PeriodFormat.getDefault().print(period));
+
+ schedule(new ScheduledFlow(scheduleId, projectId, flowId, user,
+ userSubmit, submitTime, firstSchedTime, period));
+ }
+
+ /**
+ * Schedule the flow
+ *
+ * @param flowId
+ * @param date
+ * @param ignoreDep
+ */
+ public void schedule(
+ String scheduleId,
+ String projectId,
+ String flowId,
+ String user,
+ String userSubmit,
+ DateTime submitTime,
+ DateTime firstSchedTime)
+ {
+ logger.info("Scheduling flow '" + scheduleId + "' for "
+ + _dateFormat.print(firstSchedTime));
+ schedule(new ScheduledFlow(scheduleId, projectId, flowId, user, userSubmit, submitTime,
+ firstSchedTime));
+ }
+
+ /**
+ * Schedules the flow, but doesn't save the schedule afterwards.
+ *
+ * @param flow
+ */
+ private synchronized void internalSchedule(ScheduledFlow flow) {
+ ScheduledFlow existing = scheduleIDMap.get(flow.getScheduleId());
+ flow.updateTime();
+ if (existing != null) {
+ this.runner.removeScheduledFlow(existing);
+ }
+
this.runner.addScheduledFlow(flow);
- scheduleIDMap.put(flow.getScheduleId(), flow);
- }
-
- /**
- * Adds a flow to the schedule.
- *
- * @param flow
- */
- public synchronized void schedule(ScheduledFlow flow) {
- internalSchedule(flow);
- saveSchedule();
- }
-
- /**
- * Save the schedule
- */
- private void saveSchedule() {
- loader.saveSchedule(getSchedule());
- }
-
-
- /**
- * Thread that simply invokes the running of flows when the schedule is
- * ready.
- *
- * @author Richard Park
- *
- */
- public class ScheduleRunner extends Thread {
- private final PriorityBlockingQueue<ScheduledFlow> schedule;
- private AtomicBoolean stillAlive = new AtomicBoolean(true);
-
- // Five minute minimum intervals
- private static final int TIMEOUT_MS = 300000;
-
- public ScheduleRunner() {
- schedule = new PriorityBlockingQueue<ScheduledFlow>(1, new ScheduleComparator());
- }
-
- public void shutdown() {
- logger.error("Shutting down scheduler thread");
- stillAlive.set(false);
- this.interrupt();
- }
-
- /**
- * Return a list of scheduled flow
- * @return
- */
- public synchronized List<ScheduledFlow> getSchedule() {
- return new ArrayList<ScheduledFlow>(schedule);
- }
-
- /**
- * Adds the flow to the schedule and then interrupts so it will update its wait time.
- * @param flow
- */
- public synchronized void addScheduledFlow(ScheduledFlow flow) {
- logger.info("Adding " + flow + " to schedule.");
- schedule.add(flow);
-// MonitorImpl.getInternalMonitorInterface().workflowEvent(null,
-// System.currentTimeMillis(),
-// WorkflowAction.SCHEDULE_WORKFLOW,
-// WorkflowState.NOP,
-// flow.getId());
-
- this.interrupt();
- }
-
- /**
- * Remove scheduled flows. Does not interrupt.
- *
- * @param flow
- */
- public synchronized void removeScheduledFlow(ScheduledFlow flow) {
- logger.info("Removing " + flow + " from the schedule.");
- schedule.remove(flow);
-// MonitorImpl.getInternalMonitorInterface().workflowEvent(null,
-// System.currentTimeMillis(),
-// WorkflowAction.UNSCHEDULE_WORKFLOW,
-// WorkflowState.NOP,
-// flow.getId());
- // Don't need to interrupt, because if this is originally on the top of the queue,
- // it'll just skip it.
- }
-
- public void run() {
- while(stillAlive.get()) {
- synchronized (this) {
- try {
- //TODO clear up the exception handling
- ScheduledFlow schedFlow = schedule.peek();
-
- if (schedFlow == null) {
- // If null, wake up every minute or so to see if there's something to do.
- // Most likely there will not be.
- try {
- this.wait(TIMEOUT_MS);
- } catch (InterruptedException e) {
- // interruption should occur when items are added or removed from the queue.
- }
- }
- else {
- // We've passed the flow execution time, so we will run.
- if (!schedFlow.getNextExecTime().isAfterNow()) {
- // Run flow. The invocation of flows should be quick.
- ScheduledFlow runningFlow = schedule.poll();
- logger.info("Scheduler attempting to run " + runningFlow.getScheduleId());
-
- // Execute the flow here
- try {
- Project project = projectManager.getProject(runningFlow.getProjectId());
- if (project == null) {
- logger.error("Scheduled Project "+runningFlow.getProjectId()+" does not exist!");
- throw new RuntimeException("Error finding the scheduled project. " + runningFlow.getScheduleId());
- }
-
- Flow flow = project.getFlow(runningFlow.getFlowId());
- if (flow == null) {
- logger.error("Flow " + runningFlow.getFlowId() + " cannot be found in project " + project.getName());
- throw new RuntimeException("Error finding the scheduled flow. " + runningFlow.getScheduleId());
- }
-
- HashMap<String, Props> sources;
- try {
- sources = projectManager.getAllFlowProperties(project, runningFlow.getFlowId());
- }
- catch (ProjectManagerException e) {
- logger.error(e.getMessage());
- throw new RuntimeException("Error getting the flow resources. " + runningFlow.getScheduleId());
- }
-
- // Create ExecutableFlow
- ExecutableFlow exflow = executorManager.createExecutableFlow(flow);
- exflow.setSubmitUser(runningFlow.getUser());
- //TODO make disabled in scheduled flow
-// Map<String, String> paramGroup = this.getParamGroup(req, "disabled");
-// for (Map.Entry<String, String> entry: paramGroup.entrySet()) {
-// boolean nodeDisabled = Boolean.parseBoolean(entry.getValue());
-// exflow.setStatus(entry.getKey(), nodeDisabled ? Status.DISABLED : Status.READY);
-// }
-
- // Create directory
- try {
- executorManager.setupExecutableFlow(exflow);
- } catch (ExecutorManagerException e) {
- try {
- executorManager.cleanupAll(exflow);
- } catch (ExecutorManagerException e1) {
- e1.printStackTrace();
- }
- logger.error(e.getMessage());
- return;
- }
-
- // Copy files to the source.
- File executionDir = new File(exflow.getExecutionPath());
- try {
- projectManager.copyProjectSourceFilesToDirectory(project, executionDir);
- } catch (ProjectManagerException e) {
- try {
- executorManager.cleanupAll(exflow);
- } catch (ExecutorManagerException e1) {
- e1.printStackTrace();
- }
- logger.error(e.getMessage());
- return;
- }
-
-
- try {
- executorManager.executeFlow(exflow);
- } catch (ExecutorManagerException e) {
- try {
- executorManager.cleanupAll(exflow);
- } catch (ExecutorManagerException e1) {
- e1.printStackTrace();
- }
-
- logger.error(e.getMessage());
- return;
- }
- } catch (JobExecutionException e) {
- logger.info("Could not run flow. " + e.getMessage());
- }
- schedule.remove(runningFlow);
-
- // Immediately reschedule if it's possible. Let the execution manager
- // handle any duplicate runs.
- if (runningFlow.updateTime()) {
- schedule.add(runningFlow);
- }
- saveSchedule();
- }
- else {
- // wait until flow run
- long millisWait = Math.max(0, schedFlow.getNextExecTime().getMillis() - (new DateTime()).getMillis());
- try {
- this.wait(Math.min(millisWait, TIMEOUT_MS));
- } catch (InterruptedException e) {
- // interruption should occur when items are added or removed from the queue.
- }
- }
- }
- }
- catch (Exception e) {
- logger.error("Unexpected exception has been thrown in scheduler", e);
- }
- catch (Throwable e) {
- logger.error("Unexpected throwable has been thrown in scheduler", e);
- }
- }
- }
- }
-
- /**
- * Class to sort the schedule based on time.
- *
- * @author Richard Park
- */
- private class ScheduleComparator implements Comparator<ScheduledFlow>{
- @Override
- public int compare(ScheduledFlow arg0, ScheduledFlow arg1) {
- DateTime first = arg1.getNextExecTime();
- DateTime second = arg0.getNextExecTime();
-
- if (first.isEqual(second)) {
- return 0;
- }
- else if (first.isBefore(second)) {
- return 1;
- }
-
- return -1;
- }
- }
- }
+ scheduleIDMap.put(flow.getScheduleId(), flow);
+ }
+
+ /**
+ * Adds a flow to the schedule.
+ *
+ * @param flow
+ */
+ public synchronized void schedule(ScheduledFlow flow) {
+ internalSchedule(flow);
+ saveSchedule();
+ }
+
+ /**
+ * Save the schedule
+ */
+ private void saveSchedule() {
+ loader.saveSchedule(getSchedule());
+ }
+
+ /**
+ * Thread that simply invokes the running of flows when the schedule is
+ * ready.
+ *
+ * @author Richard Park
+ *
+ */
+ public class ScheduleRunner extends Thread {
+ private final PriorityBlockingQueue<ScheduledFlow> schedule;
+ private AtomicBoolean stillAlive = new AtomicBoolean(true);
+
+ // Five minute minimum intervals
+ private static final int TIMEOUT_MS = 300000;
+
+ public ScheduleRunner() {
+ schedule = new PriorityBlockingQueue<ScheduledFlow>(1,
+ new ScheduleComparator());
+ }
+
+ public void shutdown() {
+ logger.error("Shutting down scheduler thread");
+ stillAlive.set(false);
+ this.interrupt();
+ }
+
+ /**
+ * Return a list of scheduled flow
+ *
+ * @return
+ */
+ public synchronized List<ScheduledFlow> getSchedule() {
+ return new ArrayList<ScheduledFlow>(schedule);
+ }
+
+ /**
+ * Adds the flow to the schedule and then interrupts so it will update
+ * its wait time.
+ *
+ * @param flow
+ */
+ public synchronized void addScheduledFlow(ScheduledFlow flow) {
+ logger.info("Adding " + flow + " to schedule.");
+ schedule.add(flow);
+ // MonitorImpl.getInternalMonitorInterface().workflowEvent(null,
+ // System.currentTimeMillis(),
+ // WorkflowAction.SCHEDULE_WORKFLOW,
+ // WorkflowState.NOP,
+ // flow.getId());
+
+ this.interrupt();
+ }
+
+ /**
+ * Remove scheduled flows. Does not interrupt.
+ *
+ * @param flow
+ */
+ public synchronized void removeScheduledFlow(ScheduledFlow flow) {
+ logger.info("Removing " + flow + " from the schedule.");
+ schedule.remove(flow);
+ // MonitorImpl.getInternalMonitorInterface().workflowEvent(null,
+ // System.currentTimeMillis(),
+ // WorkflowAction.UNSCHEDULE_WORKFLOW,
+ // WorkflowState.NOP,
+ // flow.getId());
+ // Don't need to interrupt, because if this is originally on the top
+ // of the queue,
+ // it'll just skip it.
+ }
+
+ public void run() {
+ while (stillAlive.get()) {
+ synchronized (this) {
+ try {
+ // TODO clear up the exception handling
+ ScheduledFlow schedFlow = schedule.peek();
+
+ if (schedFlow == null) {
+ // If null, wake up every minute or so to see if
+ // there's something to do.
+ // Most likely there will not be.
+ try {
+ this.wait(TIMEOUT_MS);
+ } catch (InterruptedException e) {
+ // interruption should occur when items are
+ // added or removed from the queue.
+ }
+ } else {
+ // We've passed the flow execution time, so we will
+ // run.
+ if (!schedFlow.getNextExecTime().isAfterNow()) {
+ // Run flow. The invocation of flows should be
+ // quick.
+ ScheduledFlow runningFlow = schedule.poll();
+ logger.info("Scheduler attempting to run "
+ + runningFlow.getScheduleId());
+
+ // Execute the flow here
+ try {
+ Project project = projectManager
+ .getProject(runningFlow
+ .getProjectId());
+ if (project == null) {
+ logger.error("Scheduled Project "
+ + runningFlow.getProjectId()
+ + " does not exist!");
+ throw new RuntimeException(
+ "Error finding the scheduled project. "
+ + runningFlow
+ .getScheduleId());
+ }
+
+ Flow flow = project.getFlow(runningFlow
+ .getFlowId());
+ if (flow == null) {
+ logger.error("Flow "
+ + runningFlow.getFlowId()
+ + " cannot be found in project "
+ + project.getName());
+ throw new RuntimeException(
+ "Error finding the scheduled flow. "
+ + runningFlow
+ .getScheduleId());
+ }
+
+ HashMap<String, Props> sources;
+ try {
+ sources = projectManager
+ .getAllFlowProperties(project,
+ runningFlow.getFlowId());
+ } catch (ProjectManagerException e) {
+ logger.error(e.getMessage());
+ throw new RuntimeException(
+ "Error getting the flow resources. "
+ + runningFlow
+ .getScheduleId());
+ }
+
+ // Create ExecutableFlow
+ ExecutableFlow exflow = executorManager
+ .createExecutableFlow(flow);
+ exflow.setSubmitUser(runningFlow.getUser());
+ // TODO make disabled in scheduled flow
+ // Map<String, String> paramGroup =
+ // this.getParamGroup(req, "disabled");
+ // for (Map.Entry<String, String> entry:
+ // paramGroup.entrySet()) {
+ // boolean nodeDisabled =
+ // Boolean.parseBoolean(entry.getValue());
+ // exflow.setStatus(entry.getKey(),
+ // nodeDisabled ? Status.DISABLED :
+ // Status.READY);
+ // }
+
+ // Create directory
+ try {
+ executorManager
+ .setupExecutableFlow(exflow);
+ } catch (ExecutorManagerException e) {
+ try {
+ executorManager.cleanupAll(exflow);
+ } catch (ExecutorManagerException e1) {
+ e1.printStackTrace();
+ }
+ logger.error(e.getMessage());
+ return;
+ }
+
+ // Copy files to the source.
+ File executionDir = new File(
+ exflow.getExecutionPath());
+ try {
+ projectManager
+ .copyProjectSourceFilesToDirectory(
+ project, executionDir);
+ } catch (ProjectManagerException e) {
+ try {
+ executorManager.cleanupAll(exflow);
+ } catch (ExecutorManagerException e1) {
+ e1.printStackTrace();
+ }
+ logger.error(e.getMessage());
+ return;
+ }
+
+ try {
+ executorManager.executeFlow(exflow);
+ } catch (ExecutorManagerException e) {
+ try {
+ executorManager.cleanupAll(exflow);
+ } catch (ExecutorManagerException e1) {
+ e1.printStackTrace();
+ }
+
+ logger.error(e.getMessage());
+ return;
+ }
+ } catch (JobExecutionException e) {
+ logger.info("Could not run flow. "
+ + e.getMessage());
+ }
+ schedule.remove(runningFlow);
+
+ // Immediately reschedule if it's possible. Let
+ // the execution manager
+ // handle any duplicate runs.
+ if (runningFlow.updateTime()) {
+ schedule.add(runningFlow);
+ }
+ saveSchedule();
+ } else {
+ // wait until flow run
+ long millisWait = Math.max(0, schedFlow
+ .getNextExecTime().getMillis()
+ - (new DateTime()).getMillis());
+ try {
+ this.wait(Math.min(millisWait, TIMEOUT_MS));
+ } catch (InterruptedException e) {
+ // interruption should occur when items are
+ // added or removed from the queue.
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error(
+ "Unexpected exception has been thrown in scheduler",
+ e);
+ } catch (Throwable e) {
+ logger.error(
+ "Unexpected throwable has been thrown in scheduler",
+ e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Class to sort the schedule based on time.
+ *
+ * @author Richard Park
+ */
+ private class ScheduleComparator implements Comparator<ScheduledFlow> {
+ @Override
+ public int compare(ScheduledFlow arg0, ScheduledFlow arg1) {
+ DateTime first = arg1.getNextExecTime();
+ DateTime second = arg0.getNextExecTime();
+
+ if (first.isEqual(second)) {
+ return 0;
+ } else if (first.isBefore(second)) {
+ return 1;
+ }
+
+ return -1;
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 83d394a..f3fb647 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -442,7 +442,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ret.put("error", e.getMessage());
return;
}
-
try {
executorManager.executeFlow(exflow);
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 648333b..6d54af0 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -37,6 +37,8 @@ import azkaban.flow.Node;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
import azkaban.project.ProjectManagerException;
+import azkaban.scheduler.ScheduleManager;
+import azkaban.scheduler.ScheduledFlow;
import azkaban.user.Permission;
import azkaban.user.UserManager;
import azkaban.user.Permission.Type;
@@ -55,6 +57,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
private ProjectManager projectManager;
private ExecutorManager executorManager;
+ private ScheduleManager scheduleManager;
private MultipartParser multipartParser;
private File tempDir;
private static Comparator<Flow> FLOW_ID_COMPARATOR = new Comparator<Flow>() {
@@ -69,6 +72,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
super.init(config);
projectManager = this.getApplication().getProjectManager();
executorManager = this.getApplication().getExecutorManager();
+ scheduleManager = this.getApplication().getScheduleManager();
tempDir = this.getApplication().getTempDirectory();
multipartParser = new MultipartParser(DEFAULT_UPLOAD_DISK_SPOOL_SIZE);
@@ -89,6 +93,9 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
else if (hasParam(req, "flow")) {
handleFlowPage(req, resp, session);
}
+ else if (hasParam(req, "delete")) {
+ handleRemoveProject(req, resp, session);
+ }
else {
handleProjectPage(req, resp, session);
}
@@ -220,6 +227,63 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
ret.put("executions", history);
}
+ private void handleRemoveProject(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+ User user = session.getUser();
+ String projectName = getParam(req, "project");
+
+ Project project = projectManager.getProject(projectName);
+ if (project == null) {
+ this.setErrorMessageInCookie(resp, "Project " + projectName + " doesn't exist.");
+ resp.sendRedirect(req.getContextPath());
+ return;
+ }
+
+ if (!project.hasPermission(user, Type.ADMIN)) {
+ this.setErrorMessageInCookie(resp, "Cannot delete. User '" + user.getUserId() + "' is not an ADMIN.");
+ resp.sendRedirect(req.getRequestURI() + "?project=" + projectName);
+ return;
+ }
+
+ // Check if scheduled
+ ScheduledFlow sflow = null;
+ for (ScheduledFlow flow: scheduleManager.getSchedule()) {
+ if (flow.getProjectId().equals(projectName)) {
+ sflow = flow;
+ break;
+ }
+ }
+ if (sflow != null) {
+ this.setErrorMessageInCookie(resp, "Cannot delete. Please unschedule " + sflow.getScheduleId() + ".");
+ resp.sendRedirect(req.getRequestURI() + "?project=" + projectName);
+ return;
+ }
+
+ // Check if executing
+ ExecutableFlow exflow = null;
+ for (ExecutableFlow flow: executorManager.getRunningFlows()) {
+ if (flow.getProjectId() == projectName) {
+ exflow = flow;
+ break;
+ }
+ }
+ if (exflow != null) {
+ this.setErrorMessageInCookie(resp, "Cannot delete. Executable flow " + exflow.getExecutionId() + " is still running.");
+ resp.sendRedirect(req.getRequestURI() + "?project=" + projectName);
+ return;
+ }
+
+ try {
+ projectManager.removeProject(projectName);
+ } catch (ProjectManagerException e) {
+ this.setErrorMessageInCookie(resp, e.getMessage());
+ resp.sendRedirect(req.getRequestURI() + "?project=" + projectName);
+ return;
+ }
+
+ this.setSuccessMessageInCookie(resp, "Project '" + projectName + "' was successfully deleted.");
+ resp.sendRedirect(req.getContextPath());
+ }
+
private void ajaxChangeDescription(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
String description = getParam(req, "description");
project.setDescription(description);
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 59b931c..a05ca60 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -178,10 +178,8 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
DateTime submitTime = new DateTime();
DateTime firstSchedTime = day.withHourOfDay(hour).withMinuteOfHour(minutes).withSecondOfMinute(0);
- scheduleManager.schedule(scheduleId,userExec, userSubmit, submitTime, firstSchedTime, thePeriod);
+ scheduleManager.schedule(scheduleId, projectId, flowId, userExec, userSubmit, submitTime, firstSchedTime, thePeriod);
-
-
ret.put("status", "success");
ret.put("message", scheduleId + " scheduled.");
diff --git a/src/java/azkaban/webapp/servlet/velocity/index.vm b/src/java/azkaban/webapp/servlet/velocity/index.vm
index 0c1555d..f7c3944 100644
--- a/src/java/azkaban/webapp/servlet/velocity/index.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/index.vm
@@ -23,6 +23,11 @@
<div class="messaging"><p id="messageClose">X</p><p id="message"></p></div>
<div class="content">
+ #if($error_message != "null")
+ <div class="box-error-message">$error_message</div>
+ #elseif($success_message != "null")
+ <div class="box-success-message">$success_message</div>
+ #end
<div id="all-jobs-content">
<div class="section-hd">
#if ($allProjects)
diff --git a/src/java/azkaban/webapp/servlet/velocity/projectpage.vm b/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
index 97fe423..cd2695f 100644
--- a/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
@@ -49,6 +49,10 @@
<tr><td class="first">Project Admins:</td><td>$admins</td></tr>
<tr><td class="first">Your Permissions:</td><td>$userpermission.toString()</td></tr>
</table>
+ #if($admin)
+ <div id="deleteProject" class="btn6">Delete Project</div>
+ #end
+
</div>
<div id="project-summary">
@@ -59,7 +63,7 @@
<tr><td class="first">Modified by:</td><td>$project.lastModifiedUser</td></tr>
<tr><td class="first">Description:</td><td id="pdescription">$project.description</td>
#if($admin)
- <td><div id="edit" class="btn5">Edit</div></td>
+ <td><div id="edit" class="btn5">Edit Description</div></td>
#end
</tr>
</table>
@@ -131,6 +135,22 @@
</div>
</div>
</div>
+ <div id="delete-project" class="modal">
+ <h3>Delete Project</h3>
+ <div class="warn">
+ <div class="warning-icon"></div>
+ <div class="warning-message"><p>Warning: This project will be deleted and may not be recoverable.</p></div>
+ </div>
+ <form id="delete-form">
+ <input type="hidden" name="project" value="$project.name" />
+ <input type="hidden" name="delete" value="true" />
+ </form>
+
+ <div class="actions">
+ <a class="no simplemodal-close btn3" href="#">Cancel</a>
+ <a class="yes btn6" id="delete-btn" href="#">Yes</a>
+ </div>
+ </div>
</body>
</html>
src/web/css/azkaban.css 40(+32 -8)
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 449e05d..2679b52 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -420,10 +420,10 @@ tr:hover td {
background: linear-gradient(to bottom, #f2f5f6 0%,#e3eaed 37%,#c8d7dc 100%); /* W3C */
filter: progid:DXImageTransform.Microsoft.gradient( startColorstr='#f2f5f6', endColorstr='#c8d7dc',GradientType=0 ); /* IE6-9 */
- border-color: #9BA7AA;
- color: #61696B;
- display: inline-block;
- font-weight: bold;
+ border-color: #9BA7AA;
+ color: #61696B;
+ display: inline-block;
+ font-weight: bold;
}
.btn7:hover {
background: #fcfeff; /* Old browsers */
@@ -621,7 +621,8 @@ tr:hover td {
}
.modal .btn2,
-.modal .btn3 {
+.modal .btn3,
+.modal .btn6 {
float: right;
margin-left: 10px;
}
@@ -968,6 +969,12 @@ tr:hover td {
width: 30%;
}
+#deleteProject {
+ width: 100px;
+ text-align: center;
+ margin-top: 10px;
+}
+
#job-summary {
margin-left: 30px;
margin-bottom: 10px;
@@ -995,7 +1002,7 @@ tr:hover td {
border-width: 0px;
}
.summary-table .first {
- width: 100px;
+ min-width: 100px;
font-weight: bold;
}
@@ -1379,7 +1386,7 @@ table.parameters tr td {
}
#edit {
- width: 60px;
+ width: 90px;
text-align: center;
}
@@ -1753,8 +1760,25 @@ ul.disableMenu {
margin-left: 15px;
}
-/* old styles */
+.warn {
+ margin-left: 30px;
+ height: 40px;
+}
+
+.warning-icon {
+ float: left;
+ background-image: url("./images/redwarning.png");
+ width: 48px;
+ height: 43px;
+}
+
+.warning-message {
+ float: left;
+ margin-left: 10px;
+ margin-top: 10px;
+}
+/* old styles */
.azkaban-charts .hitarea {
background-image: url("../../js/jqueryui/themes/custom-theme/images/ui-icons_cccccc_256x240.png");
background-position: 0 -16px;
src/web/css/images/redwarning.png 0(+0 -0)
diff --git a/src/web/css/images/redwarning.png b/src/web/css/images/redwarning.png
new file mode 100644
index 0000000..08c5fe6
Binary files /dev/null and b/src/web/css/images/redwarning.png differ
src/web/js/azkaban.project.view.js 39(+34 -5)
diff --git a/src/web/js/azkaban.project.view.js b/src/web/js/azkaban.project.view.js
index 0365e80..8694413 100644
--- a/src/web/js/azkaban.project.view.js
+++ b/src/web/js/azkaban.project.view.js
@@ -6,7 +6,6 @@ azkaban.ProjectView= Backbone.View.extend({
"click #project-upload-btn":"handleUploadProjectJob"
},
initialize : function(settings) {
-
},
handleUploadProjectJob : function(evt) {
console.log("click upload project");
@@ -43,6 +42,20 @@ azkaban.UploadProjectView= Backbone.View.extend({
}
});
+var deleteProjectView;
+azkaban.DeleteProjectView= Backbone.View.extend({
+ events : {
+ "click #delete-btn": "handleDeleteProject"
+ },
+ initialize : function(settings) {
+ },
+ handleDeleteProject : function(evt) {
+ $("#delete-form").submit();
+ },
+ render: function() {
+ }
+});
+
var flowTableView;
azkaban.FlowTableView= Backbone.View.extend({
events : {
@@ -166,7 +179,7 @@ azkaban.ProjectSummaryView= Backbone.View.extend({
var editText = $("#edit").text();
var descriptionTD = $('#pdescription');
- if (editText != "Edit") {
+ if (editText != "Edit Description") {
var requestURL = contextURL + "/manager";
var newText = $("#descEdit").val();
@@ -184,10 +197,9 @@ azkaban.ProjectSummaryView= Backbone.View.extend({
$(descriptionTD).remove("#descEdit");
$(descriptionTD).text(newText);
- $("#edit").text("Edit");
+ $("#edit").text("Edit Description");
}
else {
-
var text = $(descriptionTD).text();
var edit = document.createElement("textarea");
@@ -204,12 +216,29 @@ azkaban.ProjectSummaryView= Backbone.View.extend({
}
});
-
$(function() {
projectView = new azkaban.ProjectView({el:$('#all-jobs-content')});
uploadView = new azkaban.UploadProjectView({el:$('#upload-project')});
flowTableView = new azkaban.FlowTableView({el:$('#flow-tabs')});
projectSummary = new azkaban.ProjectSummaryView({el:$('#project-summary')});
+ deleteProjectView = new azkaban.DeleteProjectView({el: $('#delete-project')});
// Setting up the project tabs
+ $('#deleteProject').click(function() {
+ $('#delete-project').modal({
+ closeHTML: "<a href='#' title='Close' class='modal-close'>x</a>",
+ position: ["20%",],
+ containerId: 'confirm-container',
+ containerCss: {
+ 'height': '220px',
+ 'width': '565px'
+ },
+ onShow: function (dialog) {
+ var modal = this;
+ $("#errorMsg").hide();
+ }
+ });
+ }
+ );
+
});
diff --git a/unit/java/azkaban/scheduler/MockLoader.java b/unit/java/azkaban/scheduler/MockLoader.java
index 0d1f5eb..867b502 100644
--- a/unit/java/azkaban/scheduler/MockLoader.java
+++ b/unit/java/azkaban/scheduler/MockLoader.java
@@ -9,8 +9,8 @@ import org.joda.time.Period;
public class MockLoader implements ScheduleLoader {
private ArrayList<ScheduledFlow> scheduledFlow = new ArrayList<ScheduledFlow>();
- public void addScheduledFlow(String scheduleId, String user, String userSubmit, DateTime submitTime, DateTime firstSchedTime, DateTime nextExec, Period recurrence) {
- ScheduledFlow flow = new ScheduledFlow(scheduleId, user, userSubmit, submitTime, firstSchedTime, nextExec, recurrence);
+ public void addScheduledFlow(String scheduleId, String projectId, String flowId, String user, String userSubmit, DateTime submitTime, DateTime firstSchedTime, DateTime nextExec, Period recurrence) {
+ ScheduledFlow flow = new ScheduledFlow(scheduleId, projectId, flowId, user, userSubmit, submitTime, firstSchedTime, nextExec, recurrence);
addScheduleFlow(flow);
}