azkaban-memoizeit
Changes
src/java/azkaban/scheduler/ScheduleManager.java 112(+53 -59)
Details
diff --git a/src/java/azkaban/executor/ExecutionOptions.java b/src/java/azkaban/executor/ExecutionOptions.java
index 83d982c..495ce3c 100644
--- a/src/java/azkaban/executor/ExecutionOptions.java
+++ b/src/java/azkaban/executor/ExecutionOptions.java
@@ -12,6 +12,9 @@ import java.util.Set;
* Execution options for submitted flows and scheduled flows
*/
public class ExecutionOptions {
+ public static final String CONCURRENT_OPTION_SKIP="skip";
+ public static final String CONCURRENT_OPTION_PIPELINE="pipeline";
+ public static final String CONCURRENT_OPTION_IGNORE="ignore";
private boolean notifyOnFirstFailure = true;
private boolean notifyOnLastFailure = false;
@@ -20,8 +23,8 @@ public class ExecutionOptions {
private Integer pipelineLevel = null;
private Integer pipelineExecId = null;
- private Integer queueLevel = null;
- private String concurrentOption = null;
+ private Integer queueLevel = 0;
+ private String concurrentOption = CONCURRENT_OPTION_IGNORE;
private Map<String, String> flowParameters = new HashMap<String, String>();
public enum FailureAction {
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 5f7e532..e2319c7 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -364,14 +364,14 @@ public class ExecutorManager {
String message = "";
if (!running.isEmpty()) {
- if (options.getConcurrentOption().equals("pipeline")) {
+ if (options.getConcurrentOption().equals(ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
Collections.sort(running);
Integer runningExecId = running.get(running.size() - 1);
options.setPipelineExecutionId(runningExecId);
message = "Flow " + flowId + " is already running with exec id " + runningExecId +". Pipelining level " + options.getPipelineLevel() + ". ";
}
- else if (options.getConcurrentOption().equals("skip")) {
+ else if (options.getConcurrentOption().equals(ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
throw new ExecutorManagerException("Flow " + flowId + " is already running. Skipping execution.");
}
else {
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index 3e84c86..b27cc96 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -347,9 +347,11 @@ public class Schedule{
else if (schedObj.containsKey("flowOptions")){
ExecutionOptions execOptions = ExecutionOptions.createFromObject(schedObj.get("flowOptions"));
this.executionOptions = execOptions;
+ execOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
}
else {
this.executionOptions = new ExecutionOptions();
+ this.executionOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
}
if (schedObj.containsKey("slaOptions")) {
src/java/azkaban/scheduler/ScheduleManager.java 112(+53 -59)
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 586bfab..2733b3e 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -350,72 +350,66 @@ public class ScheduleManager {
if (!(new DateTime(s.getNextExecTime())).isAfterNow()) {
// Run flow. The invocation of flows should be quick.
Schedule runningSched = schedules.poll();
-
- logger.info("Scheduler attempting to run " + runningSched.toString() );
-
- // check if it is already running
- if(!executorManager.isFlowRunning(runningSched.getProjectId(), runningSched.getFlowName()))
- {
- logger.info("Scheduler ready to run " + runningSched.toString());
- // Execute the flow here
+
+ logger.info("Scheduler ready to run " + runningSched.toString());
+ // Execute the flow here
+ try {
+ Project project = projectManager.getProject(runningSched.getProjectId());
+ if (project == null) {
+ logger.error("Scheduled Project " + runningSched.getProjectId() + " does not exist!");
+ throw new RuntimeException("Error finding the scheduled project. "+ runningSched.getProjectId());
+ }
+ //TODO It is possible that the project is there, but the flow doesn't exist because upload a version that changes flow structure
+
+ Flow flow = project.getFlow(runningSched.getFlowName());
+ if (flow == null) {
+ logger.error("Flow " + runningSched.getScheduleName() + " cannot be found in project " + project.getName());
+ throw new RuntimeException("Error finding the scheduled flow. " + runningSched.getScheduleName());
+ }
+
+ // Create ExecutableFlow
+ ExecutableFlow exflow = new ExecutableFlow(flow);
+ exflow.setSubmitUser(runningSched.getSubmitUser());
+ exflow.setProxyUsers(project.getProxyUsers());
+
+ ExecutionOptions flowOptions = runningSched.getExecutionOptions();
+ if(flowOptions == null) {
+ flowOptions = new ExecutionOptions();
+ flowOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
+ }
+ exflow.setExecutionOptions(flowOptions);
+
try {
- Project project = projectManager.getProject(runningSched.getProjectId());
- if (project == null) {
- logger.error("Scheduled Project " + runningSched.getProjectId() + " does not exist!");
- throw new RuntimeException("Error finding the scheduled project. "+ runningSched.getProjectId());
- }
- //TODO It is possible that the project is there, but the flow doesn't exist because upload a version that changes flow structure
-
- Flow flow = project.getFlow(runningSched.getFlowName());
- if (flow == null) {
- logger.error("Flow " + runningSched.getScheduleName() + " cannot be found in project " + project.getName());
- throw new RuntimeException("Error finding the scheduled flow. " + runningSched.getScheduleName());
- }
-
- // Create ExecutableFlow
- ExecutableFlow exflow = new ExecutableFlow(flow);
- exflow.setSubmitUser(runningSched.getSubmitUser());
- exflow.setProxyUsers(project.getProxyUsers());
-
- ExecutionOptions flowOptions = runningSched.getExecutionOptions();
- if(flowOptions == null) {
- flowOptions = new ExecutionOptions();
- }
- exflow.setExecutionOptions(flowOptions);
-
- try {
- executorManager.submitExecutableFlow(exflow);
- logger.info("Scheduler has invoked " + exflow.getExecutionId());
- } catch (Exception e) {
- e.printStackTrace();
- throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
- }
-
- SlaOptions slaOptions = runningSched.getSlaOptions();
- if(slaOptions != null) {
- logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
- // submit flow slas
- List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
- for(SlaSetting set : slaOptions.getSettings()) {
- if(set.getId().equals("")) {
- DateTime checkTime = new DateTime(runningSched.getNextExecTime()).plus(set.getDuration());
- slaManager.submitSla(exflow.getExecutionId(), "", checkTime, slaOptions.getSlaEmails(), set.getActions(), null, set.getRule());
- }
- else {
- jobsettings.add(set);
- }
+ executorManager.submitExecutableFlow(exflow);
+ logger.info("Scheduler has invoked " + exflow.getExecutionId());
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
+ }
+
+ SlaOptions slaOptions = runningSched.getSlaOptions();
+ if(slaOptions != null) {
+ logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
+ // submit flow slas
+ List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
+ for(SlaSetting set : slaOptions.getSettings()) {
+ if(set.getId().equals("")) {
+ DateTime checkTime = new DateTime(runningSched.getNextExecTime()).plus(set.getDuration());
+ slaManager.submitSla(exflow.getExecutionId(), "", checkTime, slaOptions.getSlaEmails(), set.getActions(), null, set.getRule());
}
- if(jobsettings.size() > 0) {
- slaManager.submitSla(exflow.getExecutionId(), "", DateTime.now(), slaOptions.getSlaEmails(), new ArrayList<SlaAction>(), jobsettings, SlaRule.WAITANDCHECKJOB);
+ else {
+ jobsettings.add(set);
}
}
-
- } catch (Exception e) {
- logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
+ if(jobsettings.size() > 0) {
+ slaManager.submitSla(exflow.getExecutionId(), "", DateTime.now(), slaOptions.getSlaEmails(), new ArrayList<SlaAction>(), jobsettings, SlaRule.WAITANDCHECKJOB);
+ }
}
+ } catch (Exception e) {
+ logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
}
-
+
removeRunnerSchedule(runningSched);
// Immediately reschedule if it's possible. Let