azkaban-memoizeit

Merge branch 'release-2.1'

4/10/2013 4:15:42 PM

Details

diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index b9a00e2..d342f0f 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -224,7 +224,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 				continue;
 			}
 			
-			if (flow.getUpdateTime() >= updateTime) {
+			if (flow.getUpdateTime() > updateTime) {
 				updateList.add(flow.toUpdateObject(updateTime));
 			}
 		}
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index be562bd..fb4c349 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -69,6 +69,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	
 	private JobRunnerEventListener listener = new JobRunnerEventListener();
 	private Map<String, JobRunner> jobRunners = new ConcurrentHashMap<String, JobRunner>();
+	private Map<String, JobRunner> activeJobRunners = new ConcurrentHashMap<String, JobRunner>();
 	
 	// Used for pipelining
 	private Integer pipelineLevel = null;
@@ -300,6 +301,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 								try {
 									executorService.submit(runner);
 									jobRunners.put(node.getJobId(), runner);
+									activeJobRunners.put(node.getJobId(), runner);
 								} catch (RejectedExecutionException e) {
 									logger.error(e);
 								};
@@ -373,6 +375,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 	}
 
 	private boolean isFlowFinished() {
+		if (!activeJobRunners.isEmpty()) {
+			return false;
+		}
+		
 		for (String end: flow.getEndNodes()) {
 			ExecutableNode node = flow.getExecutableNode(end);
 			if (!Status.isStatusFinished(node.getStatus()) ) {
@@ -521,7 +527,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 				watcher.stopWatcher();
 			}
 			
-			for (JobRunner runner : jobRunners.values()) {
+			for (JobRunner runner : activeJobRunners.values()) {
 				runner.cancel();
 			}
 			
@@ -672,7 +678,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 			if (event.getType() == Type.JOB_FINISHED) {
 				synchronized(mainSyncObj) {
 					ExecutableNode node = runner.getNode();
-	
+					activeJobRunners.remove(node.getJobId());
+					
 					logger.info("Job Finished " + node.getJobId() + " with status " + node.getStatus());
 					if (runner.getOutputProps() != null) {
 						logger.info("Job " + node.getJobId() + " had output props.");
@@ -769,6 +776,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 	}
 	
 	public int getNumRunningJobs() {
-		return jobRunners.size();
+		return activeJobRunners.size();
 	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/executor/ExecutionOptions.java b/src/java/azkaban/executor/ExecutionOptions.java
index 83d982c..495ce3c 100644
--- a/src/java/azkaban/executor/ExecutionOptions.java
+++ b/src/java/azkaban/executor/ExecutionOptions.java
@@ -12,6 +12,9 @@ import java.util.Set;
  * Execution options for submitted flows and scheduled flows
  */
 public class ExecutionOptions {
+	public static final String CONCURRENT_OPTION_SKIP="skip";
+	public static final String CONCURRENT_OPTION_PIPELINE="pipeline";
+	public static final String CONCURRENT_OPTION_IGNORE="ignore";
 	
 	private boolean notifyOnFirstFailure = true;
 	private boolean notifyOnLastFailure = false;
@@ -20,8 +23,8 @@ public class ExecutionOptions {
 	
 	private Integer pipelineLevel = null;
 	private Integer pipelineExecId = null;
-	private Integer queueLevel = null;
-	private String concurrentOption = null;
+	private Integer queueLevel = 0;
+	private String concurrentOption = CONCURRENT_OPTION_IGNORE;
 	private Map<String, String> flowParameters = new HashMap<String, String>();
 	
 	public enum FailureAction {
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index af3320b..ff94590 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -384,14 +384,14 @@ public class ExecutorManager {
 			
 			String message = "";
 			if (!running.isEmpty()) {
-				if (options.getConcurrentOption().equals("pipeline")) {
+				if (options.getConcurrentOption().equals(ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
 					Collections.sort(running);
 					Integer runningExecId = running.get(running.size() - 1);
 					
 					options.setPipelineExecutionId(runningExecId);
 					message = "Flow " + flowId + " is already running with exec id " + runningExecId +". Pipelining level " + options.getPipelineLevel() + ". ";
 				}
-				else if (options.getConcurrentOption().equals("skip")) {
+				else if (options.getConcurrentOption().equals(ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
 					throw new ExecutorManagerException("Flow " + flowId + " is already running. Skipping execution.");
 				}
 				else {
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index 3e84c86..b27cc96 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -347,9 +347,11 @@ public class Schedule{
 		else if (schedObj.containsKey("flowOptions")){
 			ExecutionOptions execOptions = ExecutionOptions.createFromObject(schedObj.get("flowOptions"));
 			this.executionOptions = execOptions;
+			execOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
 		}
 		else {
 			this.executionOptions = new ExecutionOptions();
+			this.executionOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
 		}
 
 		if (schedObj.containsKey("slaOptions")) {
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 586bfab..2733b3e 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -350,72 +350,66 @@ public class ScheduleManager {
 							if (!(new DateTime(s.getNextExecTime())).isAfterNow()) {
 								// Run flow. The invocation of flows should be quick.
 								Schedule runningSched = schedules.poll();
-								
-								logger.info("Scheduler attempting to run " + runningSched.toString() );
-								
-								// check if it is already running
-								if(!executorManager.isFlowRunning(runningSched.getProjectId(), runningSched.getFlowName()))
-								{
-									logger.info("Scheduler ready to run " + runningSched.toString());
-									// Execute the flow here
+
+								logger.info("Scheduler ready to run " + runningSched.toString());
+								// Execute the flow here
+								try {
+									Project project = projectManager.getProject(runningSched.getProjectId());
+									if (project == null) {
+										logger.error("Scheduled Project " + runningSched.getProjectId() + " does not exist!");
+										throw new RuntimeException("Error finding the scheduled project. "+ runningSched.getProjectId());
+									}	
+									//TODO It is possible that the project is there, but the flow doesn't exist because upload a version that changes flow structure
+
+									Flow flow = project.getFlow(runningSched.getFlowName());
+									if (flow == null) {
+										logger.error("Flow " + runningSched.getScheduleName() + " cannot be found in project " + project.getName());
+										throw new RuntimeException("Error finding the scheduled flow. " + runningSched.getScheduleName());
+									}
+
+									// Create ExecutableFlow
+									ExecutableFlow exflow = new ExecutableFlow(flow);
+									exflow.setSubmitUser(runningSched.getSubmitUser());
+									exflow.setProxyUsers(project.getProxyUsers());
+									
+									ExecutionOptions flowOptions = runningSched.getExecutionOptions();
+									if(flowOptions == null) {
+										flowOptions = new ExecutionOptions();
+										flowOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
+									}
+									exflow.setExecutionOptions(flowOptions);
+									
 									try {
-										Project project = projectManager.getProject(runningSched.getProjectId());
-										if (project == null) {
-											logger.error("Scheduled Project " + runningSched.getProjectId() + " does not exist!");
-											throw new RuntimeException("Error finding the scheduled project. "+ runningSched.getProjectId());
-										}	
-										//TODO It is possible that the project is there, but the flow doesn't exist because upload a version that changes flow structure
-	
-										Flow flow = project.getFlow(runningSched.getFlowName());
-										if (flow == null) {
-											logger.error("Flow " + runningSched.getScheduleName() + " cannot be found in project " + project.getName());
-											throw new RuntimeException("Error finding the scheduled flow. " + runningSched.getScheduleName());
-										}
-	
-										// Create ExecutableFlow
-										ExecutableFlow exflow = new ExecutableFlow(flow);
-										exflow.setSubmitUser(runningSched.getSubmitUser());
-										exflow.setProxyUsers(project.getProxyUsers());
-										
-										ExecutionOptions flowOptions = runningSched.getExecutionOptions();
-										if(flowOptions == null) {
-											flowOptions = new ExecutionOptions();
-										}
-										exflow.setExecutionOptions(flowOptions);
-										
-										try {
-											executorManager.submitExecutableFlow(exflow);
-											logger.info("Scheduler has invoked " + exflow.getExecutionId());
-										} catch (Exception e) {	
-											e.printStackTrace();
-											throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
-										}
-										
-										SlaOptions slaOptions = runningSched.getSlaOptions();
-										if(slaOptions != null) {
-											logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
-											// submit flow slas
-											List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
-											for(SlaSetting set : slaOptions.getSettings()) {
-												if(set.getId().equals("")) {
-													DateTime checkTime = new DateTime(runningSched.getNextExecTime()).plus(set.getDuration());
-													slaManager.submitSla(exflow.getExecutionId(), "", checkTime, slaOptions.getSlaEmails(), set.getActions(), null, set.getRule());
-												}
-												else {
-													jobsettings.add(set);
-												}
+										executorManager.submitExecutableFlow(exflow);
+										logger.info("Scheduler has invoked " + exflow.getExecutionId());
+									} catch (Exception e) {	
+										e.printStackTrace();
+										throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
+									}
+									
+									SlaOptions slaOptions = runningSched.getSlaOptions();
+									if(slaOptions != null) {
+										logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
+										// submit flow slas
+										List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
+										for(SlaSetting set : slaOptions.getSettings()) {
+											if(set.getId().equals("")) {
+												DateTime checkTime = new DateTime(runningSched.getNextExecTime()).plus(set.getDuration());
+												slaManager.submitSla(exflow.getExecutionId(), "", checkTime, slaOptions.getSlaEmails(), set.getActions(), null, set.getRule());
 											}
-											if(jobsettings.size() > 0) {
-												slaManager.submitSla(exflow.getExecutionId(), "", DateTime.now(), slaOptions.getSlaEmails(), new ArrayList<SlaAction>(), jobsettings, SlaRule.WAITANDCHECKJOB);
+											else {
+												jobsettings.add(set);
 											}
 										}
-										
-									} catch (Exception e) {
-										logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
+										if(jobsettings.size() > 0) {
+											slaManager.submitSla(exflow.getExecutionId(), "", DateTime.now(), slaOptions.getSlaEmails(), new ArrayList<SlaAction>(), jobsettings, SlaRule.WAITANDCHECKJOB);
+										}
 									}
 									
+								} catch (Exception e) {
+									logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
 								}
-								
+
 								removeRunnerSchedule(runningSched);
 
 								// Immediately reschedule if it's possible. Let