azkaban-uncached

added scheduler logging fixed executor manager isFlowRunning

3/7/2013 4:39:37 PM

Details

diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 72c8e3d..6cd9d48 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -108,7 +108,7 @@ public class ExecutorManager {
 	
 	public boolean isFlowRunning(int projectId, String flowId) {
 		for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
-			if (ref.getSecond().getFlowId().equals(flowId)) {
+			if ((ref.getSecond().getProjectId() == projectId) && ref.getSecond().getFlowId().equals(flowId)) {
 				return true;
 			}
 		}
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index bf27ac7..18d4790 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -345,6 +345,7 @@ public class ScheduleManager {
 							// If null, wake up every minute or so to see if
 							// there's something to do. Most likely there will not be.
 							try {
+								logger.info("Nothing scheduled to run. Checking again soon.");
 								nextWakupTime = System.currentTimeMillis() + TIMEOUT_MS;
 								this.wait(TIMEOUT_MS);
 							} catch (InterruptedException e) {
@@ -356,11 +357,12 @@ public class ScheduleManager {
 								// Run flow. The invocation of flows should be quick.
 								Schedule runningSched = schedules.poll();
 								
-								logger.info("Scheduler attempting to run " + runningSched.getScheduleName() );
+								logger.info("Scheduler attempting to run " + runningSched.toString() );
 								
 								// check if it is already running
 								if(!executorManager.isFlowRunning(runningSched.getProjectId(), runningSched.getFlowName()))
 								{
+									logger.info("Scheduler ready to run " + runningSched.toString());
 									// Execute the flow here
 									try {
 										Project project = projectManager.getProject(runningSched.getProjectId());
@@ -411,14 +413,14 @@ public class ScheduleManager {
 											executorManager.submitExecutableFlow(exflow);
 											logger.info("Scheduler has invoked " + exflow.getExecutionId());
 										} catch (Exception e) {	
-											logger.error("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.");
-											logger.error(e.getMessage());
-											return;
+											e.printStackTrace();
+											throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
 										}
 										
 										SlaOptions slaOptions = runningSched.getSlaOptions();
 										
 										if(slaOptions != null) {
+											logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
 											// submit flow slas
 											List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
 											for(SlaSetting set : slaOptions.getSettings()) {
@@ -443,7 +445,6 @@ public class ScheduleManager {
 								
 								removeRunnerSchedule(runningSched);
 
-
 								// Immediately reschedule if it's possible. Let
 								// the execution manager
 								// handle any duplicate runs.