azkaban-memoizeit

Adding default concurrent execution option due to backwards

4/10/2013 4:15:09 PM

Details

diff --git a/src/java/azkaban/executor/ExecutionOptions.java b/src/java/azkaban/executor/ExecutionOptions.java
index 83d982c..495ce3c 100644
--- a/src/java/azkaban/executor/ExecutionOptions.java
+++ b/src/java/azkaban/executor/ExecutionOptions.java
@@ -12,6 +12,9 @@ import java.util.Set;
  * Execution options for submitted flows and scheduled flows
  */
 public class ExecutionOptions {
+	public static final String CONCURRENT_OPTION_SKIP="skip";
+	public static final String CONCURRENT_OPTION_PIPELINE="pipeline";
+	public static final String CONCURRENT_OPTION_IGNORE="ignore";
 	
 	private boolean notifyOnFirstFailure = true;
 	private boolean notifyOnLastFailure = false;
@@ -20,8 +23,8 @@ public class ExecutionOptions {
 	
 	private Integer pipelineLevel = null;
 	private Integer pipelineExecId = null;
-	private Integer queueLevel = null;
-	private String concurrentOption = null;
+	private Integer queueLevel = 0;
+	private String concurrentOption = CONCURRENT_OPTION_IGNORE;
 	private Map<String, String> flowParameters = new HashMap<String, String>();
 	
 	public enum FailureAction {
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 5f7e532..e2319c7 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -364,14 +364,14 @@ public class ExecutorManager {
 			
 			String message = "";
 			if (!running.isEmpty()) {
-				if (options.getConcurrentOption().equals("pipeline")) {
+				if (options.getConcurrentOption().equals(ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
 					Collections.sort(running);
 					Integer runningExecId = running.get(running.size() - 1);
 					
 					options.setPipelineExecutionId(runningExecId);
 					message = "Flow " + flowId + " is already running with exec id " + runningExecId +". Pipelining level " + options.getPipelineLevel() + ". ";
 				}
-				else if (options.getConcurrentOption().equals("skip")) {
+				else if (options.getConcurrentOption().equals(ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
 					throw new ExecutorManagerException("Flow " + flowId + " is already running. Skipping execution.");
 				}
 				else {
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index 3e84c86..b27cc96 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -347,9 +347,11 @@ public class Schedule{
 		else if (schedObj.containsKey("flowOptions")){
 			ExecutionOptions execOptions = ExecutionOptions.createFromObject(schedObj.get("flowOptions"));
 			this.executionOptions = execOptions;
+			execOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
 		}
 		else {
 			this.executionOptions = new ExecutionOptions();
+			this.executionOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
 		}
 
 		if (schedObj.containsKey("slaOptions")) {
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 586bfab..2733b3e 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -350,72 +350,66 @@ public class ScheduleManager {
 							if (!(new DateTime(s.getNextExecTime())).isAfterNow()) {
 								// Run flow. The invocation of flows should be quick.
 								Schedule runningSched = schedules.poll();
-								
-								logger.info("Scheduler attempting to run " + runningSched.toString() );
-								
-								// check if it is already running
-								if(!executorManager.isFlowRunning(runningSched.getProjectId(), runningSched.getFlowName()))
-								{
-									logger.info("Scheduler ready to run " + runningSched.toString());
-									// Execute the flow here
+
+								logger.info("Scheduler ready to run " + runningSched.toString());
+								// Execute the flow here
+								try {
+									Project project = projectManager.getProject(runningSched.getProjectId());
+									if (project == null) {
+										logger.error("Scheduled Project " + runningSched.getProjectId() + " does not exist!");
+										throw new RuntimeException("Error finding the scheduled project. "+ runningSched.getProjectId());
+									}	
+									//TODO It is possible that the project is there, but the flow doesn't exist because upload a version that changes flow structure
+
+									Flow flow = project.getFlow(runningSched.getFlowName());
+									if (flow == null) {
+										logger.error("Flow " + runningSched.getScheduleName() + " cannot be found in project " + project.getName());
+										throw new RuntimeException("Error finding the scheduled flow. " + runningSched.getScheduleName());
+									}
+
+									// Create ExecutableFlow
+									ExecutableFlow exflow = new ExecutableFlow(flow);
+									exflow.setSubmitUser(runningSched.getSubmitUser());
+									exflow.setProxyUsers(project.getProxyUsers());
+									
+									ExecutionOptions flowOptions = runningSched.getExecutionOptions();
+									if(flowOptions == null) {
+										flowOptions = new ExecutionOptions();
+										flowOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
+									}
+									exflow.setExecutionOptions(flowOptions);
+									
 									try {
-										Project project = projectManager.getProject(runningSched.getProjectId());
-										if (project == null) {
-											logger.error("Scheduled Project " + runningSched.getProjectId() + " does not exist!");
-											throw new RuntimeException("Error finding the scheduled project. "+ runningSched.getProjectId());
-										}	
-										//TODO It is possible that the project is there, but the flow doesn't exist because upload a version that changes flow structure
-	
-										Flow flow = project.getFlow(runningSched.getFlowName());
-										if (flow == null) {
-											logger.error("Flow " + runningSched.getScheduleName() + " cannot be found in project " + project.getName());
-											throw new RuntimeException("Error finding the scheduled flow. " + runningSched.getScheduleName());
-										}
-	
-										// Create ExecutableFlow
-										ExecutableFlow exflow = new ExecutableFlow(flow);
-										exflow.setSubmitUser(runningSched.getSubmitUser());
-										exflow.setProxyUsers(project.getProxyUsers());
-										
-										ExecutionOptions flowOptions = runningSched.getExecutionOptions();
-										if(flowOptions == null) {
-											flowOptions = new ExecutionOptions();
-										}
-										exflow.setExecutionOptions(flowOptions);
-										
-										try {
-											executorManager.submitExecutableFlow(exflow);
-											logger.info("Scheduler has invoked " + exflow.getExecutionId());
-										} catch (Exception e) {	
-											e.printStackTrace();
-											throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
-										}
-										
-										SlaOptions slaOptions = runningSched.getSlaOptions();
-										if(slaOptions != null) {
-											logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
-											// submit flow slas
-											List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
-											for(SlaSetting set : slaOptions.getSettings()) {
-												if(set.getId().equals("")) {
-													DateTime checkTime = new DateTime(runningSched.getNextExecTime()).plus(set.getDuration());
-													slaManager.submitSla(exflow.getExecutionId(), "", checkTime, slaOptions.getSlaEmails(), set.getActions(), null, set.getRule());
-												}
-												else {
-													jobsettings.add(set);
-												}
+										executorManager.submitExecutableFlow(exflow);
+										logger.info("Scheduler has invoked " + exflow.getExecutionId());
+									} catch (Exception e) {	
+										e.printStackTrace();
+										throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
+									}
+									
+									SlaOptions slaOptions = runningSched.getSlaOptions();
+									if(slaOptions != null) {
+										logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
+										// submit flow slas
+										List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
+										for(SlaSetting set : slaOptions.getSettings()) {
+											if(set.getId().equals("")) {
+												DateTime checkTime = new DateTime(runningSched.getNextExecTime()).plus(set.getDuration());
+												slaManager.submitSla(exflow.getExecutionId(), "", checkTime, slaOptions.getSlaEmails(), set.getActions(), null, set.getRule());
 											}
-											if(jobsettings.size() > 0) {
-												slaManager.submitSla(exflow.getExecutionId(), "", DateTime.now(), slaOptions.getSlaEmails(), new ArrayList<SlaAction>(), jobsettings, SlaRule.WAITANDCHECKJOB);
+											else {
+												jobsettings.add(set);
 											}
 										}
-										
-									} catch (Exception e) {
-										logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
+										if(jobsettings.size() > 0) {
+											slaManager.submitSla(exflow.getExecutionId(), "", DateTime.now(), slaOptions.getSlaEmails(), new ArrayList<SlaAction>(), jobsettings, SlaRule.WAITANDCHECKJOB);
+										}
 									}
 									
+								} catch (Exception e) {
+									logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
 								}
-								
+
 								removeRunnerSchedule(runningSched);
 
 								// Immediately reschedule if it's possible. Let