diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 72c8e3d..6cd9d48 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -108,7 +108,7 @@ public class ExecutorManager {
public boolean isFlowRunning(int projectId, String flowId) {
for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
- if (ref.getSecond().getFlowId().equals(flowId)) {
+ if ((ref.getSecond().getProjectId() == projectId) && ref.getSecond().getFlowId().equals(flowId)) {
return true;
}
}
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index bf27ac7..18d4790 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -345,6 +345,7 @@ public class ScheduleManager {
// If null, wake up every minute or so to see if
// there's something to do. Most likely there will not be.
try {
+ logger.info("Nothing scheduled to run. Checking again soon.");
nextWakupTime = System.currentTimeMillis() + TIMEOUT_MS;
this.wait(TIMEOUT_MS);
} catch (InterruptedException e) {
@@ -356,11 +357,12 @@ public class ScheduleManager {
// Run flow. The invocation of flows should be quick.
Schedule runningSched = schedules.poll();
- logger.info("Scheduler attempting to run " + runningSched.getScheduleName() );
+ logger.info("Scheduler attempting to run " + runningSched.toString() );
// check if it is already running
if(!executorManager.isFlowRunning(runningSched.getProjectId(), runningSched.getFlowName()))
{
+ logger.info("Scheduler ready to run " + runningSched.toString());
// Execute the flow here
try {
Project project = projectManager.getProject(runningSched.getProjectId());
@@ -411,14 +413,14 @@ public class ScheduleManager {
executorManager.submitExecutableFlow(exflow);
logger.info("Scheduler has invoked " + exflow.getExecutionId());
} catch (Exception e) {
- logger.error("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.");
- logger.error(e.getMessage());
- return;
+ e.printStackTrace();
+ throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
}
SlaOptions slaOptions = runningSched.getSlaOptions();
if(slaOptions != null) {
+ logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
// submit flow slas
List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
for(SlaSetting set : slaOptions.getSettings()) {
@@ -443,7 +445,6 @@ public class ScheduleManager {
removeRunnerSchedule(runningSched);
-
// Immediately reschedule if it's possible. Let
// the execution manager
// handle any duplicate runs.